Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# orm/util.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9from __future__ import annotations
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import get_origin
24from typing import Iterable
25from typing import Iterator
26from typing import List
27from typing import Literal
28from typing import Match
29from typing import Optional
30from typing import Protocol
31from typing import Sequence
32from typing import Tuple
33from typing import Type
34from typing import TYPE_CHECKING
35from typing import TypeVar
36from typing import Union
37import weakref
39from . import attributes # noqa
40from . import exc as orm_exc
41from ._typing import _O
42from ._typing import insp_is_aliased_class
43from ._typing import insp_is_mapper
44from ._typing import prop_is_relationship
45from .base import _class_to_mapper as _class_to_mapper
46from .base import _MappedAnnotationBase
47from .base import _never_set as _never_set # noqa: F401
48from .base import _none_only_set as _none_only_set # noqa: F401
49from .base import _none_set as _none_set # noqa: F401
50from .base import attribute_str as attribute_str # noqa: F401
51from .base import class_mapper as class_mapper
52from .base import DynamicMapped
53from .base import InspectionAttr as InspectionAttr
54from .base import instance_str as instance_str # noqa: F401
55from .base import Mapped
56from .base import object_mapper as object_mapper
57from .base import object_state as object_state # noqa: F401
58from .base import opt_manager_of_class
59from .base import ORMDescriptor
60from .base import state_attribute_str as state_attribute_str # noqa: F401
61from .base import state_class_str as state_class_str # noqa: F401
62from .base import state_str as state_str # noqa: F401
63from .base import WriteOnlyMapped
64from .interfaces import CriteriaOption
65from .interfaces import MapperProperty as MapperProperty
66from .interfaces import ORMColumnsClauseRole
67from .interfaces import ORMEntityColumnsClauseRole
68from .interfaces import ORMFromClauseRole
69from .path_registry import PathRegistry as PathRegistry
70from .. import event
71from .. import exc as sa_exc
72from .. import inspection
73from .. import sql
74from .. import util
75from ..engine.result import result_tuple
76from ..sql import coercions
77from ..sql import expression
78from ..sql import lambdas
79from ..sql import roles
80from ..sql import util as sql_util
81from ..sql import visitors
82from ..sql._typing import is_selectable
83from ..sql.annotation import SupportsCloneAnnotations
84from ..sql.base import WriteableColumnCollection
85from ..sql.cache_key import HasCacheKey
86from ..sql.cache_key import MemoizedHasCacheKey
87from ..sql.elements import ColumnElement
88from ..sql.elements import KeyedColumnElement
89from ..sql.schema import MetaData
90from ..sql.selectable import FromClause
91from ..util.langhelpers import MemoizedSlots
92from ..util.typing import de_stringify_annotation as _de_stringify_annotation
93from ..util.typing import eval_name_only as _eval_name_only
94from ..util.typing import fixup_container_fwd_refs
95from ..util.typing import GenericProtocol
96from ..util.typing import is_origin_of_cls
97from ..util.typing import TupleAny
98from ..util.typing import Unpack
100if typing.TYPE_CHECKING:
101 from ._typing import _EntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InternalEntityType
104 from ._typing import _ORMCOLEXPR
105 from .context import _MapperEntity
106 from .context import _ORMCompileState
107 from .decl_api import RegistryType
108 from .mapper import Mapper
109 from .path_registry import _AbstractEntityRegistry
110 from .query import Query
111 from .relationships import RelationshipProperty
112 from ..engine import Row
113 from ..engine import RowMapping
114 from ..sql._typing import _CE
115 from ..sql._typing import _ColumnExpressionArgument
116 from ..sql._typing import _EquivalentColumnMap
117 from ..sql._typing import _FromClauseArgument
118 from ..sql._typing import _OnClauseArgument
119 from ..sql._typing import _PropagateAttrsType
120 from ..sql.annotation import _SA
121 from ..sql.base import ReadOnlyColumnCollection
122 from ..sql.elements import BindParameter
123 from ..sql.selectable import _ColumnsClauseElement
124 from ..sql.selectable import Select
125 from ..sql.selectable import Selectable
126 from ..sql.visitors import anon_map
127 from ..util.typing import _AnnotationScanType
128 from ..util.typing import _MatchedOnType
130_T = TypeVar("_T", bound=Any)
132all_cascades = frozenset(
133 (
134 "delete",
135 "delete-orphan",
136 "all",
137 "merge",
138 "expunge",
139 "save-update",
140 "refresh-expire",
141 "none",
142 )
143)
145_de_stringify_partial = functools.partial(
146 functools.partial,
147 locals_=util.immutabledict(
148 {
149 "Mapped": Mapped,
150 "WriteOnlyMapped": WriteOnlyMapped,
151 "DynamicMapped": DynamicMapped,
152 }
153 ),
154)
156# partial is practically useless as we have to write out the whole
157# function and maintain the signature anyway
160class _DeStringifyAnnotation(Protocol):
161 def __call__(
162 self,
163 cls: Type[Any],
164 annotation: _AnnotationScanType,
165 originating_module: str,
166 *,
167 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
168 include_generic: bool = False,
169 ) -> _MatchedOnType: ...
172de_stringify_annotation = cast(
173 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
174)
177class _EvalNameOnly(Protocol):
178 def __call__(self, name: str, module_name: str) -> Any: ...
181eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
184class CascadeOptions(FrozenSet[str]):
185 """Keeps track of the options sent to
186 :paramref:`.relationship.cascade`"""
188 _add_w_all_cascades = all_cascades.difference(
189 ["all", "none", "delete-orphan"]
190 )
191 _allowed_cascades = all_cascades
193 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
195 __slots__ = (
196 "save_update",
197 "delete",
198 "refresh_expire",
199 "merge",
200 "expunge",
201 "delete_orphan",
202 )
204 save_update: bool
205 delete: bool
206 refresh_expire: bool
207 merge: bool
208 expunge: bool
209 delete_orphan: bool
211 def __new__(
212 cls, value_list: Optional[Union[Iterable[str], str]]
213 ) -> CascadeOptions:
214 if isinstance(value_list, str) or value_list is None:
215 return cls.from_string(value_list) # type: ignore
216 values = set(value_list)
217 if values.difference(cls._allowed_cascades):
218 raise sa_exc.ArgumentError(
219 "Invalid cascade option(s): %s"
220 % ", ".join(
221 [
222 repr(x)
223 for x in sorted(
224 values.difference(cls._allowed_cascades)
225 )
226 ]
227 )
228 )
230 if "all" in values:
231 values.update(cls._add_w_all_cascades)
232 if "none" in values:
233 values.clear()
234 values.discard("all")
236 self = super().__new__(cls, values)
237 self.save_update = "save-update" in values
238 self.delete = "delete" in values
239 self.refresh_expire = "refresh-expire" in values
240 self.merge = "merge" in values
241 self.expunge = "expunge" in values
242 self.delete_orphan = "delete-orphan" in values
244 if self.delete_orphan and not self.delete:
245 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
246 return self
248 def __repr__(self):
249 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
251 @classmethod
252 def from_string(cls, arg):
253 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
254 return cls(values)
257def _metadata_for_cls(cls: Type[Any], registry: RegistryType) -> MetaData:
258 meta = getattr(cls, "metadata", None)
259 if meta is not None and isinstance(meta, MetaData):
260 return meta
261 return registry.metadata
264def _validator_events(desc, key, validator, include_removes, include_backrefs):
265 """Runs a validation method on an attribute value to be set or
266 appended.
267 """
269 if not include_backrefs:
271 def detect_is_backref(state, initiator):
272 impl = state.manager[key].impl
273 return initiator.impl is not impl
275 if include_removes:
277 def append(state, value, initiator):
278 if initiator.op is not attributes.OP_BULK_REPLACE and (
279 include_backrefs or not detect_is_backref(state, initiator)
280 ):
281 return validator(state.obj(), key, value, False)
282 else:
283 return value
285 def bulk_set(state, values, initiator):
286 if include_backrefs or not detect_is_backref(state, initiator):
287 obj = state.obj()
288 values[:] = [
289 validator(obj, key, value, False) for value in values
290 ]
292 def set_(state, value, oldvalue, initiator):
293 if include_backrefs or not detect_is_backref(state, initiator):
294 return validator(state.obj(), key, value, False)
295 else:
296 return value
298 def remove(state, value, initiator):
299 if include_backrefs or not detect_is_backref(state, initiator):
300 validator(state.obj(), key, value, True)
302 else:
304 def append(state, value, initiator):
305 if initiator.op is not attributes.OP_BULK_REPLACE and (
306 include_backrefs or not detect_is_backref(state, initiator)
307 ):
308 return validator(state.obj(), key, value)
309 else:
310 return value
312 def bulk_set(state, values, initiator):
313 if include_backrefs or not detect_is_backref(state, initiator):
314 obj = state.obj()
315 values[:] = [validator(obj, key, value) for value in values]
317 def set_(state, value, oldvalue, initiator):
318 if include_backrefs or not detect_is_backref(state, initiator):
319 return validator(state.obj(), key, value)
320 else:
321 return value
323 event.listen(desc, "append", append, raw=True, retval=True)
324 event.listen(desc, "bulk_replace", bulk_set, raw=True)
325 event.listen(desc, "set", set_, raw=True, retval=True)
326 if include_removes:
327 event.listen(desc, "remove", remove, raw=True, retval=True)
330def polymorphic_union(
331 table_map, typecolname, aliasname="p_union", cast_nulls=True
332):
333 """Create a ``UNION`` statement used by a polymorphic mapper.
335 See :ref:`concrete_inheritance` for an example of how
336 this is used.
338 :param table_map: mapping of polymorphic identities to
339 :class:`_schema.Table` objects.
340 :param typecolname: string name of a "discriminator" column, which will be
341 derived from the query, producing the polymorphic identity for
342 each row. If ``None``, no polymorphic discriminator is generated.
343 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
344 construct generated.
345 :param cast_nulls: if True, non-existent columns, which are represented
346 as labeled NULLs, will be passed into CAST. This is a legacy behavior
347 that is problematic on some backends such as Oracle - in which case it
348 can be set to False.
350 """
352 colnames: util.OrderedSet[str] = util.OrderedSet()
353 colnamemaps = {}
354 types = {}
355 for key in table_map:
356 table = table_map[key]
358 table = coercions.expect(roles.FromClauseRole, table)
359 table_map[key] = table
361 m = {}
362 for c in table.c:
363 if c.key == typecolname:
364 raise sa_exc.InvalidRequestError(
365 "Polymorphic union can't use '%s' as the discriminator "
366 "column due to mapped column %r; please apply the "
367 "'typecolname' "
368 "argument; this is available on "
369 "ConcreteBase as '_concrete_discriminator_name'"
370 % (typecolname, c)
371 )
372 colnames.add(c.key)
373 m[c.key] = c
374 types[c.key] = c.type
375 colnamemaps[table] = m
377 def col(name, table):
378 try:
379 return colnamemaps[table][name]
380 except KeyError:
381 if cast_nulls:
382 return sql.cast(sql.null(), types[name]).label(name)
383 else:
384 return sql.type_coerce(sql.null(), types[name]).label(name)
386 result = []
387 for type_, table in table_map.items():
388 if typecolname is not None:
389 result.append(
390 sql.select(
391 *(
392 [col(name, table) for name in colnames]
393 + [
394 sql.literal_column(
395 sql_util._quote_ddl_expr(type_)
396 ).label(typecolname)
397 ]
398 )
399 ).select_from(table)
400 )
401 else:
402 result.append(
403 sql.select(
404 *[col(name, table) for name in colnames]
405 ).select_from(table)
406 )
407 return sql.union_all(*result).alias(aliasname)
410def identity_key(
411 class_: Optional[Type[_T]] = None,
412 ident: Union[Any, Tuple[Any, ...]] = None,
413 *,
414 instance: Optional[_T] = None,
415 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
416 identity_token: Optional[Any] = None,
417) -> _IdentityKeyType[_T]:
418 r"""Generate "identity key" tuples, as are used as keys in the
419 :attr:`.Session.identity_map` dictionary.
421 This function has several call styles:
423 * ``identity_key(class, ident, identity_token=token)``
425 This form receives a mapped class and a primary key scalar or
426 tuple as an argument.
428 E.g.::
430 >>> identity_key(MyClass, (1, 2))
431 (<class '__main__.MyClass'>, (1, 2), None)
433 :param class: mapped class (must be a positional argument)
434 :param ident: primary key, may be a scalar or tuple argument.
435 :param identity_token: optional identity token
437 * ``identity_key(instance=instance)``
439 This form will produce the identity key for a given instance. The
440 instance need not be persistent, only that its primary key attributes
441 are populated (else the key will contain ``None`` for those missing
442 values).
444 E.g.::
446 >>> instance = MyClass(1, 2)
447 >>> identity_key(instance=instance)
448 (<class '__main__.MyClass'>, (1, 2), None)
450 In this form, the given instance is ultimately run though
451 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
452 effect of performing a database check for the corresponding row
453 if the object is expired.
455 :param instance: object instance (must be given as a keyword arg)
457 * ``identity_key(class, row=row, identity_token=token)``
459 This form is similar to the class/tuple form, except is passed a
460 database result row as a :class:`.Row` or :class:`.RowMapping` object.
462 E.g.::
464 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
465 >>> identity_key(MyClass, row=row)
466 (<class '__main__.MyClass'>, (1, 2), None)
468 :param class: mapped class (must be a positional argument)
469 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
470 (must be given as a keyword arg)
471 :param identity_token: optional identity token
473 """ # noqa: E501
474 if class_ is not None:
475 mapper = class_mapper(class_)
476 if row is None:
477 if ident is None:
478 raise sa_exc.ArgumentError("ident or row is required")
479 return mapper.identity_key_from_primary_key(
480 tuple(util.to_list(ident)), identity_token=identity_token
481 )
482 else:
483 return mapper.identity_key_from_row(
484 row, identity_token=identity_token
485 )
486 elif instance is not None:
487 mapper = object_mapper(instance)
488 return mapper.identity_key_from_instance(instance)
489 else:
490 raise sa_exc.ArgumentError("class or instance is required")
493class _TraceAdaptRole(enum.Enum):
494 """Enumeration of all the use cases for ORMAdapter.
496 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
497 used for in-place adaption of column expressions to be applied to a SELECT,
498 replacing :class:`.Table` and other objects that are mapped to classes with
499 aliases of those tables in the case of joined eager loading, or in the case
500 of polymorphic loading as used with concrete mappings or other custom "with
501 polymorphic" parameters, with whole user-defined subqueries. The
502 enumerations provide an overview of all the use cases used by ORMAdapter, a
503 layer of formality as to the introduction of new ORMAdapter use cases (of
504 which none are anticipated), as well as a means to trace the origins of a
505 particular ORMAdapter within runtime debugging.
507 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
508 open-ended statement adaption, including the ``Query.with_polymorphic()``
509 method and the ``Query.select_from_entity()`` methods, favoring
510 user-explicit aliasing schemes using the ``aliased()`` and
511 ``with_polymorphic()`` standalone constructs; these still use adaption,
512 however the adaption is applied in a narrower scope.
514 """
516 # aliased() use that is used to adapt individual attributes at query
517 # construction time
518 ALIASED_INSP = enum.auto()
520 # joinedload cases; typically adapt an ON clause of a relationship
521 # join
522 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
523 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
524 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
526 # polymorphic cases - these are complex ones that replace FROM
527 # clauses, replacing tables with subqueries
528 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
529 WITH_POLYMORPHIC_ADAPTER = enum.auto()
530 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
531 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
533 # the from_statement() case, used only to adapt individual attributes
534 # from a given statement to local ORM attributes at result fetching
535 # time. assigned to ORMCompileState._from_obj_alias
536 ADAPT_FROM_STATEMENT = enum.auto()
538 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
539 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
540 # joinedloads are then placed on the outside.
541 # assigned to ORMCompileState.compound_eager_adapter
542 COMPOUND_EAGER_STATEMENT = enum.auto()
544 # the legacy Query._set_select_from() case.
545 # this is needed for Query's set operations (i.e. UNION, etc. )
546 # as well as "legacy from_self()", which while removed from 2.0 as
547 # public API, is used for the Query.count() method. this one
548 # still does full statement traversal
549 # assigned to ORMCompileState._from_obj_alias
550 LEGACY_SELECT_FROM_ALIAS = enum.auto()
553class ORMStatementAdapter(sql_util.ColumnAdapter):
554 """ColumnAdapter which includes a role attribute."""
556 __slots__ = ("role",)
558 def __init__(
559 self,
560 role: _TraceAdaptRole,
561 selectable: Selectable,
562 *,
563 equivalents: Optional[_EquivalentColumnMap] = None,
564 adapt_required: bool = False,
565 allow_label_resolve: bool = True,
566 anonymize_labels: bool = False,
567 adapt_on_names: bool = False,
568 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
569 ):
570 self.role = role
571 super().__init__(
572 selectable,
573 equivalents=equivalents,
574 adapt_required=adapt_required,
575 allow_label_resolve=allow_label_resolve,
576 anonymize_labels=anonymize_labels,
577 adapt_on_names=adapt_on_names,
578 adapt_from_selectables=adapt_from_selectables,
579 )
582class ORMAdapter(sql_util.ColumnAdapter):
583 """ColumnAdapter subclass which excludes adaptation of entities from
584 non-matching mappers.
586 """
588 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
590 is_aliased_class: bool
591 aliased_insp: Optional[AliasedInsp[Any]]
593 def __init__(
594 self,
595 role: _TraceAdaptRole,
596 entity: _InternalEntityType[Any],
597 *,
598 equivalents: Optional[_EquivalentColumnMap] = None,
599 adapt_required: bool = False,
600 allow_label_resolve: bool = True,
601 anonymize_labels: bool = False,
602 selectable: Optional[Selectable] = None,
603 limit_on_entity: bool = True,
604 adapt_on_names: bool = False,
605 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
606 ):
607 self.role = role
608 self.mapper = entity.mapper
609 if selectable is None:
610 selectable = entity.selectable
611 if insp_is_aliased_class(entity):
612 self.is_aliased_class = True
613 self.aliased_insp = entity
614 else:
615 self.is_aliased_class = False
616 self.aliased_insp = None
618 super().__init__(
619 selectable,
620 equivalents,
621 adapt_required=adapt_required,
622 allow_label_resolve=allow_label_resolve,
623 anonymize_labels=anonymize_labels,
624 include_fn=self._include_fn if limit_on_entity else None,
625 adapt_on_names=adapt_on_names,
626 adapt_from_selectables=adapt_from_selectables,
627 )
629 def _include_fn(self, elem):
630 entity = elem._annotations.get("parentmapper", None)
632 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
635class AliasedClass(
636 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
637):
638 r"""Represents an "aliased" form of a mapped class for usage with Query.
640 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
641 construct, this object mimics the mapped class using a
642 ``__getattr__`` scheme and maintains a reference to a
643 real :class:`~sqlalchemy.sql.expression.Alias` object.
645 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
646 within a SQL statement generated by the ORM, such that an existing
647 mapped entity can be used in multiple contexts. A simple example::
649 # find all pairs of users with the same name
650 user_alias = aliased(User)
651 session.query(User, user_alias).join(
652 (user_alias, User.id > user_alias.id)
653 ).filter(User.name == user_alias.name)
655 :class:`.AliasedClass` is also capable of mapping an existing mapped
656 class to an entirely new selectable, provided this selectable is column-
657 compatible with the existing mapped selectable, and it can also be
658 configured in a mapping as the target of a :func:`_orm.relationship`.
659 See the links below for examples.
661 The :class:`.AliasedClass` object is constructed typically using the
662 :func:`_orm.aliased` function. It also is produced with additional
663 configuration when using the :func:`_orm.with_polymorphic` function.
665 The resulting object is an instance of :class:`.AliasedClass`.
666 This object implements an attribute scheme which produces the
667 same attribute and method interface as the original mapped
668 class, allowing :class:`.AliasedClass` to be compatible
669 with any attribute technique which works on the original class,
670 including hybrid attributes (see :ref:`hybrids_toplevel`).
672 The :class:`.AliasedClass` can be inspected for its underlying
673 :class:`_orm.Mapper`, aliased selectable, and other information
674 using :func:`_sa.inspect`::
676 from sqlalchemy import inspect
678 my_alias = aliased(MyClass)
679 insp = inspect(my_alias)
681 The resulting inspection object is an instance of :class:`.AliasedInsp`.
684 .. seealso::
686 :func:`.aliased`
688 :func:`.with_polymorphic`
690 :ref:`relationship_aliased_class`
692 :ref:`relationship_to_window_function`
695 """
697 __name__: str
699 def __init__(
700 self,
701 mapped_class_or_ac: _EntityType[_O],
702 alias: Optional[FromClause] = None,
703 name: Optional[str] = None,
704 flat: bool = False,
705 adapt_on_names: bool = False,
706 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
707 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
708 base_alias: Optional[AliasedInsp[Any]] = None,
709 use_mapper_path: bool = False,
710 represents_outer_join: bool = False,
711 ):
712 insp = cast(
713 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
714 )
715 mapper = insp.mapper
717 nest_adapters = False
719 if alias is None:
720 if insp.is_aliased_class and insp.selectable._is_subquery:
721 alias = insp.selectable.alias()
722 else:
723 alias = (
724 mapper._with_polymorphic_selectable._anonymous_fromclause(
725 name=name,
726 flat=flat,
727 )
728 )
729 elif insp.is_aliased_class:
730 nest_adapters = True
732 assert alias is not None
733 self._aliased_insp = AliasedInsp(
734 self,
735 insp,
736 alias,
737 name,
738 (
739 with_polymorphic_mappers
740 if with_polymorphic_mappers
741 else mapper.with_polymorphic_mappers
742 ),
743 (
744 with_polymorphic_discriminator
745 if with_polymorphic_discriminator is not None
746 else mapper.polymorphic_on
747 ),
748 base_alias,
749 use_mapper_path,
750 adapt_on_names,
751 represents_outer_join,
752 nest_adapters,
753 )
755 self.__name__ = f"aliased({mapper.class_.__name__})"
757 @classmethod
758 def _reconstitute_from_aliased_insp(
759 cls, aliased_insp: AliasedInsp[_O]
760 ) -> AliasedClass[_O]:
761 obj = cls.__new__(cls)
762 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
763 obj._aliased_insp = aliased_insp
765 if aliased_insp._is_with_polymorphic:
766 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
767 if sub_aliased_insp is not aliased_insp:
768 ent = AliasedClass._reconstitute_from_aliased_insp(
769 sub_aliased_insp
770 )
771 setattr(obj, sub_aliased_insp.class_.__name__, ent)
773 return obj
775 def __getattr__(self, key: str) -> Any:
776 try:
777 _aliased_insp = self.__dict__["_aliased_insp"]
778 except KeyError:
779 raise AttributeError()
780 else:
781 target = _aliased_insp._target
782 # maintain all getattr mechanics
783 attr = getattr(target, key)
785 # attribute is a method, that will be invoked against a
786 # "self"; so just return a new method with the same function and
787 # new self
788 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
789 return types.MethodType(attr.__func__, self)
791 # attribute is a descriptor, that will be invoked against a
792 # "self"; so invoke the descriptor against this self
793 if hasattr(attr, "__get__"):
794 attr = attr.__get__(None, self)
796 # attributes within the QueryableAttribute system will want this
797 # to be invoked so the object can be adapted
798 if hasattr(attr, "adapt_to_entity"):
799 attr = attr.adapt_to_entity(_aliased_insp)
800 setattr(self, key, attr)
802 return attr
804 def _get_from_serialized(
805 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
806 ) -> Any:
807 # this method is only used in terms of the
808 # sqlalchemy.ext.serializer extension
809 attr = getattr(mapped_class, key)
810 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
811 return types.MethodType(attr.__func__, self)
813 # attribute is a descriptor, that will be invoked against a
814 # "self"; so invoke the descriptor against this self
815 if hasattr(attr, "__get__"):
816 attr = attr.__get__(None, self)
818 # attributes within the QueryableAttribute system will want this
819 # to be invoked so the object can be adapted
820 if hasattr(attr, "adapt_to_entity"):
821 aliased_insp._weak_entity = weakref.ref(self)
822 attr = attr.adapt_to_entity(aliased_insp)
823 setattr(self, key, attr)
825 return attr
827 def __repr__(self) -> str:
828 return "<AliasedClass at 0x%x; %s>" % (
829 id(self),
830 self._aliased_insp._target.__name__,
831 )
833 def __str__(self) -> str:
834 return str(self._aliased_insp)
837@inspection._self_inspects
838class AliasedInsp(
839 ORMEntityColumnsClauseRole[_O],
840 ORMFromClauseRole,
841 HasCacheKey,
842 InspectionAttr,
843 MemoizedSlots,
844 inspection.Inspectable["AliasedInsp[_O]"],
845 Generic[_O],
846):
847 """Provide an inspection interface for an
848 :class:`.AliasedClass` object.
850 The :class:`.AliasedInsp` object is returned
851 given an :class:`.AliasedClass` using the
852 :func:`_sa.inspect` function::
854 from sqlalchemy import inspect
855 from sqlalchemy.orm import aliased
857 my_alias = aliased(MyMappedClass)
858 insp = inspect(my_alias)
860 Attributes on :class:`.AliasedInsp`
861 include:
863 * ``entity`` - the :class:`.AliasedClass` represented.
864 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
865 * ``selectable`` - the :class:`_expression.Alias`
866 construct which ultimately
867 represents an aliased :class:`_schema.Table` or
868 :class:`_expression.Select`
869 construct.
870 * ``name`` - the name of the alias. Also is used as the attribute
871 name when returned in a result tuple from :class:`_query.Query`.
872 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
873 objects
874 indicating all those mappers expressed in the select construct
875 for the :class:`.AliasedClass`.
876 * ``polymorphic_on`` - an alternate column or SQL expression which
877 will be used as the "discriminator" for a polymorphic load.
879 .. seealso::
881 :ref:`inspection_toplevel`
883 """
885 __slots__ = (
886 "__weakref__",
887 "_weak_entity",
888 "mapper",
889 "selectable",
890 "name",
891 "_adapt_on_names",
892 "with_polymorphic_mappers",
893 "polymorphic_on",
894 "_use_mapper_path",
895 "_base_alias",
896 "represents_outer_join",
897 "persist_selectable",
898 "local_table",
899 "_is_with_polymorphic",
900 "_with_polymorphic_entities",
901 "_adapter",
902 "_target",
903 "__clause_element__",
904 "_memoized_values",
905 "_all_column_expressions",
906 "_nest_adapters",
907 )
909 _cache_key_traversal = [
910 ("name", visitors.ExtendedInternalTraversal.dp_string),
911 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
912 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
913 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
914 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
915 (
916 "with_polymorphic_mappers",
917 visitors.InternalTraversal.dp_has_cache_key_list,
918 ),
919 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
920 ]
922 mapper: Mapper[_O]
923 selectable: FromClause
924 _adapter: ORMAdapter
925 with_polymorphic_mappers: Sequence[Mapper[Any]]
926 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
928 _weak_entity: weakref.ref[AliasedClass[_O]]
929 """the AliasedClass that refers to this AliasedInsp"""
931 _target: Union[Type[_O], AliasedClass[_O]]
932 """the thing referenced by the AliasedClass/AliasedInsp.
934 In the vast majority of cases, this is the mapped class. However
935 it may also be another AliasedClass (alias of alias).
937 """
939 def __init__(
940 self,
941 entity: AliasedClass[_O],
942 inspected: _InternalEntityType[_O],
943 selectable: FromClause,
944 name: Optional[str],
945 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
946 polymorphic_on: Optional[ColumnElement[Any]],
947 _base_alias: Optional[AliasedInsp[Any]],
948 _use_mapper_path: bool,
949 adapt_on_names: bool,
950 represents_outer_join: bool,
951 nest_adapters: bool,
952 ):
953 mapped_class_or_ac = inspected.entity
954 mapper = inspected.mapper
956 self._weak_entity = weakref.ref(entity)
957 self.mapper = mapper
958 self.selectable = self.persist_selectable = self.local_table = (
959 selectable
960 )
961 self.name = name
962 self.polymorphic_on = polymorphic_on
963 self._base_alias = weakref.ref(_base_alias or self)
964 self._use_mapper_path = _use_mapper_path
965 self.represents_outer_join = represents_outer_join
966 self._nest_adapters = nest_adapters
968 if with_polymorphic_mappers:
969 self._is_with_polymorphic = True
970 self.with_polymorphic_mappers = with_polymorphic_mappers
971 self._with_polymorphic_entities = []
972 for poly in self.with_polymorphic_mappers:
973 if poly is not mapper:
974 ent = AliasedClass(
975 poly.class_,
976 selectable,
977 base_alias=self,
978 adapt_on_names=adapt_on_names,
979 use_mapper_path=_use_mapper_path,
980 )
982 setattr(self.entity, poly.class_.__name__, ent)
983 self._with_polymorphic_entities.append(ent._aliased_insp)
985 else:
986 self._is_with_polymorphic = False
987 self.with_polymorphic_mappers = [mapper]
989 self._adapter = ORMAdapter(
990 _TraceAdaptRole.ALIASED_INSP,
991 mapper,
992 selectable=selectable,
993 equivalents=mapper._equivalent_columns,
994 adapt_on_names=adapt_on_names,
995 anonymize_labels=True,
996 # make sure the adapter doesn't try to grab other tables that
997 # are not even the thing we are mapping, such as embedded
998 # selectables in subqueries or CTEs. See issue #6060
999 adapt_from_selectables={
1000 m.selectable
1001 for m in self.with_polymorphic_mappers
1002 if not adapt_on_names
1003 },
1004 limit_on_entity=False,
1005 )
1007 if nest_adapters:
1008 # supports "aliased class of aliased class" use case
1009 assert isinstance(inspected, AliasedInsp)
1010 self._adapter = inspected._adapter.wrap(self._adapter)
1012 self._adapt_on_names = adapt_on_names
1013 self._target = mapped_class_or_ac
1015 @property
1016 def _post_inspect(self): # type: ignore[override]
1017 self.mapper._check_configure()
1019 @classmethod
1020 def _alias_factory(
1021 cls,
1022 element: Union[_EntityType[_O], FromClause],
1023 alias: Optional[FromClause] = None,
1024 name: Optional[str] = None,
1025 flat: bool = False,
1026 adapt_on_names: bool = False,
1027 ) -> Union[AliasedClass[_O], FromClause]:
1028 if isinstance(element, FromClause):
1029 if adapt_on_names:
1030 raise sa_exc.ArgumentError(
1031 "adapt_on_names only applies to ORM elements"
1032 )
1033 if name:
1034 return element.alias(name=name, flat=flat)
1035 else:
1036 # see selectable.py->Alias._factory() for similar
1037 # mypy issue. Cannot get the overload to see this
1038 # in mypy (works fine in pyright)
1039 return coercions.expect( # type: ignore[no-any-return]
1040 roles.AnonymizedFromClauseRole, element, flat=flat
1041 )
1042 else:
1043 return AliasedClass(
1044 element,
1045 alias=alias,
1046 flat=flat,
1047 name=name,
1048 adapt_on_names=adapt_on_names,
1049 )
1051 @classmethod
1052 def _with_polymorphic_factory(
1053 cls,
1054 base: Union[Type[_O], Mapper[_O]],
1055 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1056 selectable: Union[Literal[False, None], FromClause] = False,
1057 flat: bool = False,
1058 polymorphic_on: Optional[ColumnElement[Any]] = None,
1059 aliased: bool = False,
1060 innerjoin: bool = False,
1061 adapt_on_names: bool = False,
1062 name: Optional[str] = None,
1063 _use_mapper_path: bool = False,
1064 ) -> AliasedClass[_O]:
1065 primary_mapper = _class_to_mapper(base)
1067 if selectable not in (None, False) and flat:
1068 raise sa_exc.ArgumentError(
1069 "the 'flat' and 'selectable' arguments cannot be passed "
1070 "simultaneously to with_polymorphic()"
1071 )
1073 mappers, selectable = primary_mapper._with_polymorphic_args(
1074 classes, selectable, innerjoin=innerjoin
1075 )
1076 if aliased or flat:
1077 assert selectable is not None
1078 selectable = selectable._anonymous_fromclause(flat=flat)
1080 return AliasedClass(
1081 base,
1082 selectable,
1083 name=name,
1084 with_polymorphic_mappers=mappers,
1085 adapt_on_names=adapt_on_names,
1086 with_polymorphic_discriminator=polymorphic_on,
1087 use_mapper_path=_use_mapper_path,
1088 represents_outer_join=not innerjoin,
1089 )
1091 @property
1092 def entity(self) -> AliasedClass[_O]:
1093 # to eliminate reference cycles, the AliasedClass is held weakly.
1094 # this produces some situations where the AliasedClass gets lost,
1095 # particularly when one is created internally and only the AliasedInsp
1096 # is passed around.
1097 # to work around this case, we just generate a new one when we need
1098 # it, as it is a simple class with very little initial state on it.
1099 ent = self._weak_entity()
1100 if ent is None:
1101 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1102 self._weak_entity = weakref.ref(ent)
1103 return ent
1105 is_aliased_class = True
1106 "always returns True"
1108 def _memoized_method___clause_element__(self) -> FromClause:
1109 return self.selectable._annotate(
1110 {
1111 "parentmapper": self.mapper,
1112 "parententity": self,
1113 "entity_namespace": self,
1114 }
1115 )._set_propagate_attrs(
1116 {"compile_state_plugin": "orm", "plugin_subject": self}
1117 )
1119 @property
1120 def entity_namespace(self) -> AliasedClass[_O]:
1121 return self.entity
1123 @property
1124 def class_(self) -> Type[_O]:
1125 """Return the mapped class ultimately represented by this
1126 :class:`.AliasedInsp`."""
1127 return self.mapper.class_
1129 @property
1130 def _path_registry(self) -> _AbstractEntityRegistry:
1131 if self._use_mapper_path:
1132 return self.mapper._path_registry
1133 else:
1134 return PathRegistry.per_mapper(self)
1136 def __getstate__(self) -> Dict[str, Any]:
1137 return {
1138 "entity": self.entity,
1139 "mapper": self.mapper,
1140 "alias": self.selectable,
1141 "name": self.name,
1142 "adapt_on_names": self._adapt_on_names,
1143 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1144 "with_polymorphic_discriminator": self.polymorphic_on,
1145 "base_alias": self._base_alias(),
1146 "use_mapper_path": self._use_mapper_path,
1147 "represents_outer_join": self.represents_outer_join,
1148 "nest_adapters": self._nest_adapters,
1149 }
1151 def __setstate__(self, state: Dict[str, Any]) -> None:
1152 self.__init__( # type: ignore
1153 state["entity"],
1154 state["mapper"],
1155 state["alias"],
1156 state["name"],
1157 state["with_polymorphic_mappers"],
1158 state["with_polymorphic_discriminator"],
1159 state["base_alias"],
1160 state["use_mapper_path"],
1161 state["adapt_on_names"],
1162 state["represents_outer_join"],
1163 state["nest_adapters"],
1164 )
1166 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1167 # assert self._is_with_polymorphic
1168 # assert other._is_with_polymorphic
1170 primary_mapper = other.mapper
1172 assert self.mapper is primary_mapper
1174 our_classes = util.to_set(
1175 mp.class_ for mp in self.with_polymorphic_mappers
1176 )
1177 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1178 if our_classes == new_classes:
1179 return other
1180 else:
1181 classes = our_classes.union(new_classes)
1183 mappers, selectable = primary_mapper._with_polymorphic_args(
1184 classes, None, innerjoin=not other.represents_outer_join
1185 )
1186 selectable = selectable._anonymous_fromclause(flat=True)
1187 return AliasedClass(
1188 primary_mapper,
1189 selectable,
1190 with_polymorphic_mappers=mappers,
1191 with_polymorphic_discriminator=other.polymorphic_on,
1192 use_mapper_path=other._use_mapper_path,
1193 represents_outer_join=other.represents_outer_join,
1194 )._aliased_insp
1196 def _adapt_element(
1197 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1198 ) -> _ORMCOLEXPR:
1199 assert isinstance(expr, ColumnElement)
1200 d: Dict[str, Any] = {
1201 "parententity": self,
1202 "parentmapper": self.mapper,
1203 }
1204 if key:
1205 d["proxy_key"] = key
1207 # userspace adapt of an attribute from AliasedClass; validate that
1208 # it actually was present
1209 adapted = self._adapter.adapt_check_present(expr)
1210 if adapted is None:
1211 adapted = expr
1212 if self._adapter.adapt_on_names:
1213 util.warn_limited(
1214 "Did not locate an expression in selectable for "
1215 "attribute %r; ensure name is correct in expression",
1216 (key,),
1217 )
1218 else:
1219 util.warn_limited(
1220 "Did not locate an expression in selectable for "
1221 "attribute %r; to match by name, use the "
1222 "adapt_on_names parameter",
1223 (key,),
1224 )
1226 return adapted._annotate(d)._set_propagate_attrs(
1227 {"compile_state_plugin": "orm", "plugin_subject": self}
1228 )
1230 if TYPE_CHECKING:
1231 # establish compatibility with the _ORMAdapterProto protocol,
1232 # which in turn is compatible with _CoreAdapterProto.
1234 def _orm_adapt_element(
1235 self,
1236 obj: _CE,
1237 key: Optional[str] = None,
1238 ) -> _CE: ...
1240 else:
1241 _orm_adapt_element = _adapt_element
1243 def _entity_for_mapper(self, mapper):
1244 self_poly = self.with_polymorphic_mappers
1245 if mapper in self_poly:
1246 if mapper is self.mapper:
1247 return self
1248 else:
1249 return getattr(
1250 self.entity, mapper.class_.__name__
1251 )._aliased_insp
1252 elif mapper.isa(self.mapper):
1253 return self
1254 else:
1255 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1257 def _memoized_attr__get_clause(self):
1258 onclause, replacemap = self.mapper._get_clause
1259 return (
1260 self._adapter.traverse(onclause),
1261 {
1262 self._adapter.traverse(col): param
1263 for col, param in replacemap.items()
1264 },
1265 )
1267 def _memoized_attr__memoized_values(self):
1268 return {}
1270 def _memoized_attr__all_column_expressions(self):
1271 if self._is_with_polymorphic:
1272 cols_plus_keys = self.mapper._columns_plus_keys(
1273 [ent.mapper for ent in self._with_polymorphic_entities]
1274 )
1275 else:
1276 cols_plus_keys = self.mapper._columns_plus_keys()
1278 cols_plus_keys = [
1279 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1280 ]
1282 return WriteableColumnCollection(cols_plus_keys)
1284 def _memo(self, key, callable_, *args, **kw):
1285 if key in self._memoized_values:
1286 return self._memoized_values[key]
1287 else:
1288 self._memoized_values[key] = value = callable_(*args, **kw)
1289 return value
1291 def __repr__(self):
1292 if self.with_polymorphic_mappers:
1293 with_poly = "(%s)" % ", ".join(
1294 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1295 )
1296 else:
1297 with_poly = ""
1298 return "<AliasedInsp at 0x%x; %s%s>" % (
1299 id(self),
1300 self.class_.__name__,
1301 with_poly,
1302 )
1304 def __str__(self):
1305 if self._is_with_polymorphic:
1306 return "with_polymorphic(%s, [%s])" % (
1307 self._target.__name__,
1308 ", ".join(
1309 mp.class_.__name__
1310 for mp in self.with_polymorphic_mappers
1311 if mp is not self.mapper
1312 ),
1313 )
1314 else:
1315 return "aliased(%s)" % (self._target.__name__,)
1318class _WrapUserEntity:
1319 """A wrapper used within the loader_criteria lambda caller so that
1320 we can bypass declared_attr descriptors on unmapped mixins, which
1321 normally emit a warning for such use.
1323 might also be useful for other per-lambda instrumentations should
1324 the need arise.
1326 """
1328 __slots__ = ("subject",)
1330 def __init__(self, subject):
1331 self.subject = subject
1333 @util.preload_module("sqlalchemy.orm.decl_api")
1334 def __getattribute__(self, name):
1335 decl_api = util.preloaded.orm.decl_api
1337 subject = object.__getattribute__(self, "subject")
1338 if name in subject.__dict__ and isinstance(
1339 subject.__dict__[name], decl_api.declared_attr
1340 ):
1341 return subject.__dict__[name].fget(subject)
1342 else:
1343 return getattr(subject, name)
1346class LoaderCriteriaOption(CriteriaOption):
1347 """Add additional WHERE criteria to the load for all occurrences of
1348 a particular entity.
1350 :class:`_orm.LoaderCriteriaOption` is invoked using the
1351 :func:`_orm.with_loader_criteria` function; see that function for
1352 details.
1354 .. versionadded:: 1.4
1356 """
1358 __slots__ = (
1359 "root_entity",
1360 "entity",
1361 "deferred_where_criteria",
1362 "where_criteria",
1363 "_where_crit_orig",
1364 "include_aliases",
1365 "propagate_to_loaders",
1366 )
1368 _traverse_internals = [
1369 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1370 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1371 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1372 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1373 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1374 ]
1376 root_entity: Optional[Type[Any]]
1377 entity: Optional[_InternalEntityType[Any]]
1378 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1379 deferred_where_criteria: bool
1380 include_aliases: bool
1381 propagate_to_loaders: bool
1383 _where_crit_orig: Any
1385 def __init__(
1386 self,
1387 entity_or_base: _EntityType[Any],
1388 where_criteria: Union[
1389 _ColumnExpressionArgument[bool],
1390 Callable[[Any], _ColumnExpressionArgument[bool]],
1391 ],
1392 loader_only: bool = False,
1393 include_aliases: bool = False,
1394 propagate_to_loaders: bool = True,
1395 track_closure_variables: bool = True,
1396 ):
1397 entity = cast(
1398 "_InternalEntityType[Any]",
1399 inspection.inspect(entity_or_base, False),
1400 )
1401 if entity is None:
1402 self.root_entity = cast("Type[Any]", entity_or_base)
1403 self.entity = None
1404 else:
1405 self.root_entity = None
1406 self.entity = entity
1408 self._where_crit_orig = where_criteria
1409 if callable(where_criteria):
1410 if self.root_entity is not None:
1411 wrap_entity = self.root_entity
1412 else:
1413 assert entity is not None
1414 wrap_entity = entity.entity
1416 self.deferred_where_criteria = True
1417 self.where_criteria = lambdas.DeferredLambdaElement(
1418 where_criteria,
1419 roles.WhereHavingRole,
1420 lambda_args=(_WrapUserEntity(wrap_entity),),
1421 opts=lambdas.LambdaOptions(
1422 track_closure_variables=track_closure_variables
1423 ),
1424 )
1425 else:
1426 self.deferred_where_criteria = False
1427 self.where_criteria = coercions.expect(
1428 roles.WhereHavingRole, where_criteria
1429 )
1431 self.include_aliases = include_aliases
1432 self.propagate_to_loaders = propagate_to_loaders
1434 @classmethod
1435 def _unreduce(
1436 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1437 ):
1438 return LoaderCriteriaOption(
1439 entity,
1440 where_criteria,
1441 include_aliases=include_aliases,
1442 propagate_to_loaders=propagate_to_loaders,
1443 )
1445 def __reduce__(self):
1446 return (
1447 LoaderCriteriaOption._unreduce,
1448 (
1449 self.entity.class_ if self.entity else self.root_entity,
1450 self._where_crit_orig,
1451 self.include_aliases,
1452 self.propagate_to_loaders,
1453 ),
1454 )
1456 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1457 if self.entity:
1458 yield from self.entity.mapper.self_and_descendants
1459 else:
1460 assert self.root_entity
1461 stack = list(self.root_entity.__subclasses__())
1462 while stack:
1463 subclass = stack.pop(0)
1464 ent = cast(
1465 "_InternalEntityType[Any]",
1466 inspection.inspect(subclass, raiseerr=False),
1467 )
1468 if ent:
1469 yield from ent.mapper.self_and_descendants
1470 else:
1471 stack.extend(subclass.__subclasses__())
1473 def _should_include(self, compile_state: _ORMCompileState) -> bool:
1474 if (
1475 compile_state.select_statement._annotations.get(
1476 "for_loader_criteria", None
1477 )
1478 is self
1479 ):
1480 return False
1481 return True
1483 def _resolve_where_criteria(
1484 self, ext_info: _InternalEntityType[Any]
1485 ) -> ColumnElement[bool]:
1486 if self.deferred_where_criteria:
1487 crit = cast(
1488 "ColumnElement[bool]",
1489 self.where_criteria._resolve_with_args(ext_info.entity),
1490 )
1491 else:
1492 crit = self.where_criteria # type: ignore
1493 assert isinstance(crit, ColumnElement)
1494 return sql_util._deep_annotate(
1495 crit,
1496 {"for_loader_criteria": self},
1497 detect_subquery_cols=True,
1498 ind_cols_on_fromclause=True,
1499 )
1501 def process_compile_state_replaced_entities(
1502 self,
1503 compile_state: _ORMCompileState,
1504 mapper_entities: Iterable[_MapperEntity],
1505 ) -> None:
1506 self.process_compile_state(compile_state)
1508 def process_compile_state(self, compile_state: _ORMCompileState) -> None:
1509 """Apply a modification to a given :class:`.CompileState`."""
1511 # if options to limit the criteria to immediate query only,
1512 # use compile_state.attributes instead
1514 self.get_global_criteria(compile_state.global_attributes)
1516 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1517 for mp in self._all_mappers():
1518 load_criteria = attributes.setdefault(
1519 ("additional_entity_criteria", mp), []
1520 )
1522 load_criteria.append(self)
1525inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1528@inspection._inspects(type)
1529def _inspect_mc(
1530 class_: Type[_O],
1531) -> Optional[Mapper[_O]]:
1532 try:
1533 class_manager = opt_manager_of_class(class_)
1534 if class_manager is None or not class_manager.is_mapped:
1535 return None
1536 mapper = class_manager.mapper
1537 except orm_exc.NO_STATE:
1538 return None
1539 else:
1540 return mapper
1543GenericAlias = type(List[Any])
1546@inspection._inspects(GenericAlias)
1547def _inspect_generic_alias(
1548 class_: Type[_O],
1549) -> Optional[Mapper[_O]]:
1550 origin = cast("Type[_O]", get_origin(class_))
1551 return _inspect_mc(origin)
1554@inspection._self_inspects
1555class Bundle(
1556 ORMColumnsClauseRole[_T],
1557 SupportsCloneAnnotations,
1558 MemoizedHasCacheKey,
1559 inspection.Inspectable["Bundle[_T]"],
1560 InspectionAttr,
1561):
1562 """A grouping of SQL expressions that are returned by a :class:`.Query`
1563 under one namespace.
1565 The :class:`.Bundle` essentially allows nesting of the tuple-based
1566 results returned by a column-oriented :class:`_query.Query` object.
1567 It also
1568 is extensible via simple subclassing, where the primary capability
1569 to override is that of how the set of expressions should be returned,
1570 allowing post-processing as well as custom return types, without
1571 involving ORM identity-mapped classes.
1573 .. seealso::
1575 :ref:`bundles`
1577 :class:`.DictBundle`
1579 """
1581 single_entity = False
1582 """If True, queries for a single Bundle will be returned as a single
1583 entity, rather than an element within a keyed tuple."""
1585 is_clause_element = False
1587 is_mapper = False
1589 is_aliased_class = False
1591 is_bundle = True
1593 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1595 proxy_set = util.EMPTY_SET
1597 exprs: List[_ColumnsClauseElement]
1599 def __init__(
1600 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1601 ) -> None:
1602 r"""Construct a new :class:`.Bundle`.
1604 e.g.::
1606 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1608 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1609 print(row.mybundle.x, row.mybundle.y)
1611 :param name: name of the bundle.
1612 :param \*exprs: columns or SQL expressions comprising the bundle.
1613 :param single_entity=False: if True, rows for this :class:`.Bundle`
1614 can be returned as a "single entity" outside of any enclosing tuple
1615 in the same manner as a mapped entity.
1617 """ # noqa: E501
1618 self.name = self._label = name
1619 coerced_exprs = [
1620 coercions.expect(
1621 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1622 )
1623 for expr in exprs
1624 ]
1625 self.exprs = coerced_exprs
1627 self.c = self.columns = WriteableColumnCollection(
1628 (getattr(col, "key", col._label), col)
1629 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1630 ).as_readonly()
1631 self.single_entity = kw.pop("single_entity", self.single_entity)
1633 def _gen_cache_key(
1634 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1635 ) -> Tuple[Any, ...]:
1636 return (self.__class__, self.name, self.single_entity) + tuple(
1637 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1638 )
1640 @property
1641 def mapper(self) -> Optional[Mapper[Any]]:
1642 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1643 "parentmapper", None
1644 )
1645 return mp
1647 @property
1648 def entity(self) -> Optional[_InternalEntityType[Any]]:
1649 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1650 0
1651 ]._annotations.get("parententity", None)
1652 return ie
1654 @property
1655 def entity_namespace(
1656 self,
1657 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1658 return self.c
1660 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1662 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1664 e.g.::
1666 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1668 q = sess.query(bn).filter(bn.c.x == 5)
1670 Nesting of bundles is also supported::
1672 b1 = Bundle(
1673 "b1",
1674 Bundle("b2", MyClass.a, MyClass.b),
1675 Bundle("b3", MyClass.x, MyClass.y),
1676 )
1678 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1680 .. seealso::
1682 :attr:`.Bundle.c`
1684 """ # noqa: E501
1686 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1687 """An alias for :attr:`.Bundle.columns`."""
1689 def _clone(self, **kw):
1690 cloned = self.__class__.__new__(self.__class__)
1691 cloned.__dict__.update(self.__dict__)
1692 return cloned
1694 def __clause_element__(self):
1695 # ensure existing entity_namespace remains
1696 annotations = {"bundle": self, "entity_namespace": self}
1697 annotations.update(self._annotations)
1699 plugin_subject = self.exprs[0]._propagate_attrs.get(
1700 "plugin_subject", self.entity
1701 )
1702 return (
1703 expression.ClauseList(
1704 _literal_as_text_role=roles.ColumnsClauseRole,
1705 group=False,
1706 *[e._annotations.get("bundle", e) for e in self.exprs],
1707 )
1708 ._annotate(annotations)
1709 ._set_propagate_attrs(
1710 # the Bundle *must* use the orm plugin no matter what. the
1711 # subject can be None but it's much better if it's not.
1712 {
1713 "compile_state_plugin": "orm",
1714 "plugin_subject": plugin_subject,
1715 }
1716 )
1717 )
1719 @property
1720 def clauses(self):
1721 return self.__clause_element__().clauses
1723 def label(self, name):
1724 """Provide a copy of this :class:`.Bundle` passing a new label."""
1726 cloned = self._clone()
1727 cloned.name = name
1728 return cloned
1730 def create_row_processor(
1731 self,
1732 query: Select[Unpack[TupleAny]],
1733 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1734 labels: Sequence[str],
1735 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1736 """Produce the "row processing" function for this :class:`.Bundle`.
1738 May be overridden by subclasses to provide custom behaviors when
1739 results are fetched. The method is passed the statement object and a
1740 set of "row processor" functions at query execution time; these
1741 processor functions when given a result row will return the individual
1742 attribute value, which can then be adapted into any kind of return data
1743 structure.
1745 The example below illustrates replacing the usual :class:`.Row`
1746 return structure with a straight Python dictionary::
1748 from sqlalchemy.orm import Bundle
1751 class DictBundle(Bundle):
1752 def create_row_processor(self, query, procs, labels):
1753 "Override create_row_processor to return values as dictionaries"
1755 def proc(row):
1756 return dict(zip(labels, (proc(row) for proc in procs)))
1758 return proc
1760 A result from the above :class:`_orm.Bundle` will return dictionary
1761 values::
1763 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1764 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1765 print(row.mybundle["data1"], row.mybundle["data2"])
1767 The above example is available natively using :class:`.DictBundle`
1769 .. seealso::
1771 :class:`.DictBundle`
1773 """ # noqa: E501
1774 keyed_tuple = result_tuple(labels, [() for l in labels])
1776 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1777 return keyed_tuple([proc(row) for proc in procs])
1779 return proc
1782class DictBundle(Bundle[_T]):
1783 """Like :class:`.Bundle` but returns ``dict`` instances instead of
1784 named tuple like objects::
1786 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1787 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1788 print(row.mybundle["data1"], row.mybundle["data2"])
1790 Differently from :class:`.Bundle`, multiple columns with the same name are
1791 not supported.
1793 .. versionadded:: 2.1
1795 .. seealso::
1797 :ref:`bundles`
1799 :class:`.Bundle`
1800 """
1802 def __init__(
1803 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1804 ) -> None:
1805 super().__init__(name, *exprs, **kw)
1806 if len(set(self.c.keys())) != len(self.c):
1807 raise sa_exc.ArgumentError(
1808 "DictBundle does not support duplicate column names"
1809 )
1811 def create_row_processor(
1812 self,
1813 query: Select[Unpack[TupleAny]],
1814 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1815 labels: Sequence[str],
1816 ) -> Callable[[Row[Unpack[TupleAny]]], dict[str, Any]]:
1817 def proc(row: Row[Unpack[TupleAny]]) -> dict[str, Any]:
1818 return dict(zip(labels, (proc(row) for proc in procs)))
1820 return proc
1823def _orm_full_deannotate(element: _SA) -> _SA:
1824 return sql_util._deep_deannotate(element)
1827class _ORMJoin(expression.Join):
1828 """Extend Join to support ORM constructs as input."""
1830 __visit_name__ = expression.Join.__visit_name__
1832 inherit_cache = True
1834 def __init__(
1835 self,
1836 left: _FromClauseArgument,
1837 right: _FromClauseArgument,
1838 onclause: Optional[_OnClauseArgument] = None,
1839 isouter: bool = False,
1840 full: bool = False,
1841 _left_memo: Optional[Any] = None,
1842 _right_memo: Optional[Any] = None,
1843 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1844 ):
1845 left_info = cast(
1846 "Union[FromClause, _InternalEntityType[Any]]",
1847 inspection.inspect(left),
1848 )
1850 right_info = cast(
1851 "Union[FromClause, _InternalEntityType[Any]]",
1852 inspection.inspect(right),
1853 )
1854 adapt_to = right_info.selectable
1856 # used by joined eager loader
1857 self._left_memo = _left_memo
1858 self._right_memo = _right_memo
1860 if isinstance(onclause, attributes.QueryableAttribute):
1861 if TYPE_CHECKING:
1862 assert isinstance(
1863 onclause.comparator, RelationshipProperty.Comparator
1864 )
1865 on_selectable = onclause.comparator._source_selectable()
1866 prop = onclause.property
1867 _extra_criteria += onclause._extra_criteria
1868 elif isinstance(onclause, MapperProperty):
1869 # used internally by joined eager loader...possibly not ideal
1870 prop = onclause
1871 on_selectable = prop.parent.selectable
1872 else:
1873 prop = None
1874 on_selectable = None
1876 left_selectable = left_info.selectable
1877 if prop:
1878 adapt_from: Optional[FromClause]
1879 if sql_util.clause_is_present(on_selectable, left_selectable):
1880 adapt_from = on_selectable
1881 else:
1882 assert isinstance(left_selectable, FromClause)
1883 adapt_from = left_selectable
1885 (
1886 pj,
1887 sj,
1888 source,
1889 dest,
1890 secondary,
1891 target_adapter,
1892 ) = prop._create_joins(
1893 source_selectable=adapt_from,
1894 dest_selectable=adapt_to,
1895 source_polymorphic=True,
1896 of_type_entity=right_info,
1897 alias_secondary=True,
1898 extra_criteria=_extra_criteria,
1899 )
1901 if sj is not None:
1902 if isouter:
1903 # note this is an inner join from secondary->right
1904 right = sql.join(secondary, right, sj)
1905 onclause = pj
1906 else:
1907 left = sql.join(left, secondary, pj, isouter)
1908 onclause = sj
1909 else:
1910 onclause = pj
1912 self._target_adapter = target_adapter
1914 # we don't use the normal coercions logic for _ORMJoin
1915 # (probably should), so do some gymnastics to get the entity.
1916 # logic here is for #8721, which was a major bug in 1.4
1917 # for almost two years, not reported/fixed until 1.4.43 (!)
1918 if is_selectable(left_info):
1919 parententity = left_selectable._annotations.get(
1920 "parententity", None
1921 )
1922 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1923 parententity = left_info
1924 else:
1925 parententity = None
1927 if parententity is not None:
1928 self._annotations = self._annotations.union(
1929 {"parententity": parententity}
1930 )
1932 augment_onclause = bool(_extra_criteria) and not prop
1933 expression.Join.__init__(self, left, right, onclause, isouter, full)
1935 assert self.onclause is not None
1937 if augment_onclause:
1938 self.onclause &= sql.and_(*_extra_criteria)
1940 if (
1941 not prop
1942 and getattr(right_info, "mapper", None)
1943 and right_info.mapper.single # type: ignore
1944 ):
1945 right_info = cast("_InternalEntityType[Any]", right_info)
1946 # if single inheritance target and we are using a manual
1947 # or implicit ON clause, augment it the same way we'd augment the
1948 # WHERE.
1949 single_crit = right_info.mapper._single_table_criterion
1950 if single_crit is not None:
1951 if insp_is_aliased_class(right_info):
1952 single_crit = right_info._adapter.traverse(single_crit)
1953 self.onclause = self.onclause & single_crit
1955 def _splice_into_center(self, other):
1956 """Splice a join into the center.
1958 Given join(a, b) and join(b, c), return join(a, b).join(c)
1960 """
1961 leftmost = other
1962 while isinstance(leftmost, sql.Join):
1963 leftmost = leftmost.left
1965 assert self.right is leftmost
1967 left = _ORMJoin(
1968 self.left,
1969 other.left,
1970 self.onclause,
1971 isouter=self.isouter,
1972 _left_memo=self._left_memo,
1973 _right_memo=other._left_memo._path_registry,
1974 )
1976 return _ORMJoin(
1977 left,
1978 other.right,
1979 other.onclause,
1980 isouter=other.isouter,
1981 _right_memo=other._right_memo,
1982 )
1984 def join(
1985 self,
1986 right: _FromClauseArgument,
1987 onclause: Optional[_OnClauseArgument] = None,
1988 isouter: bool = False,
1989 full: bool = False,
1990 ) -> _ORMJoin:
1991 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1993 def outerjoin(
1994 self,
1995 right: _FromClauseArgument,
1996 onclause: Optional[_OnClauseArgument] = None,
1997 full: bool = False,
1998 ) -> _ORMJoin:
1999 return _ORMJoin(self, right, onclause, isouter=True, full=full)
2002def with_parent(
2003 instance: object,
2004 prop: attributes.QueryableAttribute[Any],
2005 from_entity: Optional[_EntityType[Any]] = None,
2006) -> ColumnElement[bool]:
2007 """Create filtering criterion that relates this query's primary entity
2008 to the given related instance, using established
2009 :func:`_orm.relationship()`
2010 configuration.
2012 E.g.::
2014 stmt = select(Address).where(with_parent(some_user, User.addresses))
2016 The SQL rendered is the same as that rendered when a lazy loader
2017 would fire off from the given parent on that attribute, meaning
2018 that the appropriate state is taken from the parent object in
2019 Python without the need to render joins to the parent table
2020 in the rendered statement.
2022 The given property may also make use of :meth:`_orm.PropComparator.of_type`
2023 to indicate the left side of the criteria::
2026 a1 = aliased(Address)
2027 a2 = aliased(Address)
2028 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
2030 The above use is equivalent to using the
2031 :func:`_orm.with_parent.from_entity` argument::
2033 a1 = aliased(Address)
2034 a2 = aliased(Address)
2035 stmt = select(a1, a2).where(
2036 with_parent(u1, User.addresses, from_entity=a2)
2037 )
2039 :param instance:
2040 An instance which has some :func:`_orm.relationship`.
2042 :param property:
2043 Class-bound attribute, which indicates
2044 what relationship from the instance should be used to reconcile the
2045 parent/child relationship.
2047 :param from_entity:
2048 Entity in which to consider as the left side. This defaults to the
2049 "zero" entity of the :class:`_query.Query` itself.
2051 """ # noqa: E501
2052 prop_t: RelationshipProperty[Any]
2054 if isinstance(prop, str):
2055 raise sa_exc.ArgumentError(
2056 "with_parent() accepts class-bound mapped attributes, not strings"
2057 )
2058 elif isinstance(prop, attributes.QueryableAttribute):
2059 if prop._of_type:
2060 from_entity = prop._of_type
2061 mapper_property = prop.property
2062 if mapper_property is None or not prop_is_relationship(
2063 mapper_property
2064 ):
2065 raise sa_exc.ArgumentError(
2066 f"Expected relationship property for with_parent(), "
2067 f"got {mapper_property}"
2068 )
2069 prop_t = mapper_property
2070 else:
2071 prop_t = prop
2073 return prop_t._with_parent(instance, from_entity=from_entity)
2076def has_identity(object_: object) -> bool:
2077 """Return True if the given object has a database
2078 identity.
2080 This typically corresponds to the object being
2081 in either the persistent or detached state.
2083 .. seealso::
2085 :func:`.was_deleted`
2087 """
2088 state = attributes.instance_state(object_)
2089 return state.has_identity
2092def was_deleted(object_: object) -> bool:
2093 """Return True if the given object was deleted
2094 within a session flush.
2096 This is regardless of whether or not the object is
2097 persistent or detached.
2099 .. seealso::
2101 :attr:`.InstanceState.was_deleted`
2103 """
2105 state = attributes.instance_state(object_)
2106 return state.was_deleted
2109def _entity_corresponds_to(
2110 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2111) -> bool:
2112 """determine if 'given' corresponds to 'entity', in terms
2113 of an entity passed to Query that would match the same entity
2114 being referred to elsewhere in the query.
2116 """
2117 if insp_is_aliased_class(entity):
2118 if insp_is_aliased_class(given):
2119 if entity._base_alias() is given._base_alias():
2120 return True
2121 return False
2122 elif insp_is_aliased_class(given):
2123 if given._use_mapper_path:
2124 return entity in given.with_polymorphic_mappers
2125 else:
2126 return entity is given
2128 assert insp_is_mapper(given)
2129 return entity.common_parent(given)
2132def _entity_corresponds_to_use_path_impl(
2133 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2134) -> bool:
2135 """determine if 'given' corresponds to 'entity', in terms
2136 of a path of loader options where a mapped attribute is taken to
2137 be a member of a parent entity.
2139 e.g.::
2141 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2142 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2144 a1 = aliased(A)
2145 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2146 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2148 wp = with_polymorphic(A, [A1, A2])
2149 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2150 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2152 """
2153 if insp_is_aliased_class(given):
2154 return (
2155 insp_is_aliased_class(entity)
2156 and not entity._use_mapper_path
2157 and (given is entity or entity in given._with_polymorphic_entities)
2158 )
2159 elif not insp_is_aliased_class(entity):
2160 return given.isa(entity.mapper)
2161 else:
2162 return (
2163 entity._use_mapper_path
2164 and given in entity.with_polymorphic_mappers
2165 )
2168def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2169 """determine if 'given' "is a" mapper, in terms of the given
2170 would load rows of type 'mapper'.
2172 """
2173 if given.is_aliased_class:
2174 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2175 mapper
2176 )
2177 elif given.with_polymorphic_mappers:
2178 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2179 else:
2180 return given.isa(mapper)
2183def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2184 """calculate __getitem__ in terms of an iterable query object
2185 that also has a slice() method.
2187 """
2189 def _no_negative_indexes():
2190 raise IndexError(
2191 "negative indexes are not accepted by SQL "
2192 "index / slice operators"
2193 )
2195 if isinstance(item, slice):
2196 start, stop, step = util.decode_slice(item)
2198 if (
2199 isinstance(stop, int)
2200 and isinstance(start, int)
2201 and stop - start <= 0
2202 ):
2203 return []
2205 elif (isinstance(start, int) and start < 0) or (
2206 isinstance(stop, int) and stop < 0
2207 ):
2208 _no_negative_indexes()
2210 res = iterable_query.slice(start, stop)
2211 if step is not None:
2212 return list(res)[None : None : item.step]
2213 else:
2214 return list(res)
2215 else:
2216 if item == -1:
2217 _no_negative_indexes()
2218 else:
2219 return list(iterable_query[item : item + 1])[0]
2222def _is_mapped_annotation(
2223 raw_annotation: _AnnotationScanType,
2224 cls: Type[Any],
2225 originating_cls: Type[Any],
2226) -> bool:
2227 try:
2228 annotated = de_stringify_annotation(
2229 cls, raw_annotation, originating_cls.__module__
2230 )
2231 except NameError:
2232 # in most cases, at least within our own tests, we can raise
2233 # here, which is more accurate as it prevents us from returning
2234 # false negatives. However, in the real world, try to avoid getting
2235 # involved with end-user annotations that have nothing to do with us.
2236 # see issue #8888 where we bypass using this function in the case
2237 # that we want to detect an unresolvable Mapped[] type.
2238 return False
2239 else:
2240 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2243class _CleanupError(Exception):
2244 pass
2247def _cleanup_mapped_str_annotation(
2248 annotation: str, originating_module: str
2249) -> str:
2250 # fix up an annotation that comes in as the form:
2251 # 'Mapped[List[Address]]' so that it instead looks like:
2252 # 'Mapped[List["Address"]]' , which will allow us to get
2253 # "Address" as a string
2255 # additionally, resolve symbols for these names since this is where
2256 # we'd have to do it
2258 inner: Optional[Match[str]]
2260 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2262 if not mm:
2263 return annotation
2265 # ticket #8759. Resolve the Mapped name to a real symbol.
2266 # originally this just checked the name.
2267 try:
2268 obj = eval_name_only(mm.group(1), originating_module)
2269 except NameError as ne:
2270 raise _CleanupError(
2271 f'For annotation "{annotation}", could not resolve '
2272 f'container type "{mm.group(1)}". '
2273 "Please ensure this type is imported at the module level "
2274 "outside of TYPE_CHECKING blocks"
2275 ) from ne
2277 if obj is typing.ClassVar:
2278 real_symbol = "ClassVar"
2279 else:
2280 try:
2281 if issubclass(obj, _MappedAnnotationBase):
2282 real_symbol = obj.__name__
2283 else:
2284 return annotation
2285 except TypeError:
2286 # avoid isinstance(obj, type) check, just catch TypeError
2287 return annotation
2289 # note: if one of the codepaths above didn't define real_symbol and
2290 # then didn't return, real_symbol raises UnboundLocalError
2291 # which is actually a NameError, and the calling routines don't
2292 # notice this since they are catching NameError anyway. Just in case
2293 # this is being modified in the future, something to be aware of.
2295 stack = []
2296 inner = mm
2297 while True:
2298 stack.append(real_symbol if mm is inner else inner.group(1))
2299 g2 = inner.group(2)
2300 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2301 if inner is None:
2302 stack.append(g2)
2303 break
2305 # stacks we want to rewrite, that is, quote the last entry which
2306 # we think is a relationship class name:
2307 #
2308 # ['Mapped', 'List', 'Address']
2309 # ['Mapped', 'A']
2310 #
2311 # stacks we dont want to rewrite, which are generally MappedColumn
2312 # use cases:
2313 #
2314 # ['Mapped', "'Optional[Dict[str, str]]'"]
2315 # ['Mapped', 'dict[str, str] | None']
2317 if (
2318 # avoid already quoted symbols such as
2319 # ['Mapped', "'Optional[Dict[str, str]]'"]
2320 not re.match(r"""^["'].*["']$""", stack[-1])
2321 # avoid further generics like Dict[] such as
2322 # ['Mapped', 'dict[str, str] | None'],
2323 # ['Mapped', 'list[int] | list[str]'],
2324 # ['Mapped', 'Union[list[int], list[str]]'],
2325 and not re.search(r"[\[\]]", stack[-1])
2326 ):
2327 stripchars = "\"' "
2328 stack[-1] = ", ".join(
2329 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2330 )
2332 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2334 return annotation
2337def _extract_mapped_subtype(
2338 raw_annotation: Optional[_AnnotationScanType],
2339 cls: type,
2340 originating_module: str,
2341 key: str,
2342 attr_cls: Type[Any],
2343 required: bool,
2344 is_dataclass_field: bool,
2345 expect_mapped: bool = True,
2346 raiseerr: bool = True,
2347) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2348 """given an annotation, figure out if it's ``Mapped[something]`` and if
2349 so, return the ``something`` part.
2351 Includes error raise scenarios and other options.
2353 """
2355 if raw_annotation is None:
2356 if required:
2357 raise orm_exc.MappedAnnotationError(
2358 f"Python typing annotation is required for attribute "
2359 f'"{cls.__name__}.{key}" when primary argument(s) for '
2360 f'"{attr_cls.__name__}" construct are None or not present'
2361 )
2362 return None
2364 try:
2365 # destringify the "outside" of the annotation. note we are not
2366 # adding include_generic so it will *not* dig into generic contents,
2367 # which will remain as ForwardRef or plain str under future annotations
2368 # mode. The full destringify happens later when mapped_column goes
2369 # to do a full lookup in the registry type_annotations_map.
2370 annotated = de_stringify_annotation(
2371 cls,
2372 raw_annotation,
2373 originating_module,
2374 str_cleanup_fn=_cleanup_mapped_str_annotation,
2375 )
2376 except _CleanupError as ce:
2377 raise orm_exc.MappedAnnotationError(
2378 f"Could not interpret annotation {raw_annotation}. "
2379 "Check that it uses names that are correctly imported at the "
2380 "module level. See chained stack trace for more hints."
2381 ) from ce
2382 except NameError as ne:
2383 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2384 raise orm_exc.MappedAnnotationError(
2385 f"Could not interpret annotation {raw_annotation}. "
2386 "Check that it uses names that are correctly imported at the "
2387 "module level. See chained stack trace for more hints."
2388 ) from ne
2390 annotated = raw_annotation # type: ignore
2392 if is_dataclass_field:
2393 return annotated, None
2394 else:
2395 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2396 annotated, _MappedAnnotationBase
2397 ):
2398 if expect_mapped:
2399 if not raiseerr:
2400 return None
2402 origin = getattr(annotated, "__origin__", None)
2403 if origin is typing.ClassVar:
2404 return None
2406 # check for other kind of ORM descriptor like AssociationProxy,
2407 # don't raise for that (issue #9957)
2408 elif isinstance(origin, type) and issubclass(
2409 origin, ORMDescriptor
2410 ):
2411 return None
2413 raise orm_exc.MappedAnnotationError(
2414 f'Type annotation for "{cls.__name__}.{key}" '
2415 "can't be correctly interpreted for "
2416 "Annotated Declarative Table form. ORM annotations "
2417 "should normally make use of the ``Mapped[]`` generic "
2418 "type, or other ORM-compatible generic type, as a "
2419 "container for the actual type, which indicates the "
2420 "intent that the attribute is mapped. "
2421 "Class variables that are not intended to be mapped "
2422 "by the ORM should use ClassVar[]. "
2423 "To allow Annotated Declarative to disregard legacy "
2424 "annotations which don't use Mapped[] to pass, set "
2425 '"__allow_unmapped__ = True" on the class or a '
2426 "superclass this class.",
2427 code="zlpr",
2428 )
2430 else:
2431 return annotated, None
2433 generic_annotated = cast(GenericProtocol[Any], annotated)
2434 if len(generic_annotated.__args__) != 1:
2435 raise orm_exc.MappedAnnotationError(
2436 "Expected sub-type for Mapped[] annotation"
2437 )
2439 return (
2440 # fix dict/list/set args to be ForwardRef, see #11814
2441 fixup_container_fwd_refs(generic_annotated.__args__[0]),
2442 generic_annotated.__origin__,
2443 )
2446def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2447 if hasattr(prop, "_mapper_property_name"):
2448 name = prop._mapper_property_name()
2449 else:
2450 name = None
2451 return util.clsname_as_plain_name(prop, name)