Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# orm/util.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9from __future__ import annotations
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import get_origin
24from typing import Iterable
25from typing import Iterator
26from typing import List
27from typing import Literal
28from typing import Match
29from typing import Optional
30from typing import Protocol
31from typing import Sequence
32from typing import Tuple
33from typing import Type
34from typing import TYPE_CHECKING
35from typing import TypeVar
36from typing import Union
37import weakref
39from . import attributes # noqa
40from . import exc as orm_exc
41from ._typing import _O
42from ._typing import insp_is_aliased_class
43from ._typing import insp_is_mapper
44from ._typing import prop_is_relationship
45from .base import _class_to_mapper as _class_to_mapper
46from .base import _MappedAnnotationBase
47from .base import _never_set as _never_set # noqa: F401
48from .base import _none_only_set as _none_only_set # noqa: F401
49from .base import _none_set as _none_set # noqa: F401
50from .base import attribute_str as attribute_str # noqa: F401
51from .base import class_mapper as class_mapper
52from .base import DynamicMapped
53from .base import InspectionAttr as InspectionAttr
54from .base import instance_str as instance_str # noqa: F401
55from .base import Mapped
56from .base import object_mapper as object_mapper
57from .base import object_state as object_state # noqa: F401
58from .base import opt_manager_of_class
59from .base import ORMDescriptor
60from .base import state_attribute_str as state_attribute_str # noqa: F401
61from .base import state_class_str as state_class_str # noqa: F401
62from .base import state_str as state_str # noqa: F401
63from .base import WriteOnlyMapped
64from .interfaces import CriteriaOption
65from .interfaces import MapperProperty as MapperProperty
66from .interfaces import ORMColumnsClauseRole
67from .interfaces import ORMEntityColumnsClauseRole
68from .interfaces import ORMFromClauseRole
69from .path_registry import PathRegistry as PathRegistry
70from .. import event
71from .. import exc as sa_exc
72from .. import inspection
73from .. import sql
74from .. import util
75from ..engine.result import result_tuple
76from ..sql import coercions
77from ..sql import expression
78from ..sql import lambdas
79from ..sql import roles
80from ..sql import util as sql_util
81from ..sql import visitors
82from ..sql._typing import is_selectable
83from ..sql.annotation import SupportsCloneAnnotations
84from ..sql.base import ColumnCollection
85from ..sql.cache_key import HasCacheKey
86from ..sql.cache_key import MemoizedHasCacheKey
87from ..sql.elements import ColumnElement
88from ..sql.elements import KeyedColumnElement
89from ..sql.selectable import FromClause
90from ..util.langhelpers import MemoizedSlots
91from ..util.typing import de_stringify_annotation as _de_stringify_annotation
92from ..util.typing import eval_name_only as _eval_name_only
93from ..util.typing import fixup_container_fwd_refs
94from ..util.typing import GenericProtocol
95from ..util.typing import is_origin_of_cls
96from ..util.typing import TupleAny
97from ..util.typing import Unpack
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InternalEntityType
103 from ._typing import _ORMCOLEXPR
104 from .context import _MapperEntity
105 from .context import _ORMCompileState
106 from .mapper import Mapper
107 from .path_registry import _AbstractEntityRegistry
108 from .query import Query
109 from .relationships import RelationshipProperty
110 from ..engine import Row
111 from ..engine import RowMapping
112 from ..sql._typing import _CE
113 from ..sql._typing import _ColumnExpressionArgument
114 from ..sql._typing import _EquivalentColumnMap
115 from ..sql._typing import _FromClauseArgument
116 from ..sql._typing import _OnClauseArgument
117 from ..sql._typing import _PropagateAttrsType
118 from ..sql.annotation import _SA
119 from ..sql.base import ReadOnlyColumnCollection
120 from ..sql.elements import BindParameter
121 from ..sql.selectable import _ColumnsClauseElement
122 from ..sql.selectable import Select
123 from ..sql.selectable import Selectable
124 from ..sql.visitors import anon_map
125 from ..util.typing import _AnnotationScanType
126 from ..util.typing import _MatchedOnType
128_T = TypeVar("_T", bound=Any)
130all_cascades = frozenset(
131 (
132 "delete",
133 "delete-orphan",
134 "all",
135 "merge",
136 "expunge",
137 "save-update",
138 "refresh-expire",
139 "none",
140 )
141)
143_de_stringify_partial = functools.partial(
144 functools.partial,
145 locals_=util.immutabledict(
146 {
147 "Mapped": Mapped,
148 "WriteOnlyMapped": WriteOnlyMapped,
149 "DynamicMapped": DynamicMapped,
150 }
151 ),
152)
154# partial is practically useless as we have to write out the whole
155# function and maintain the signature anyway
158class _DeStringifyAnnotation(Protocol):
159 def __call__(
160 self,
161 cls: Type[Any],
162 annotation: _AnnotationScanType,
163 originating_module: str,
164 *,
165 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
166 include_generic: bool = False,
167 ) -> _MatchedOnType: ...
170de_stringify_annotation = cast(
171 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
172)
175class _EvalNameOnly(Protocol):
176 def __call__(self, name: str, module_name: str) -> Any: ...
179eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
182class CascadeOptions(FrozenSet[str]):
183 """Keeps track of the options sent to
184 :paramref:`.relationship.cascade`"""
186 _add_w_all_cascades = all_cascades.difference(
187 ["all", "none", "delete-orphan"]
188 )
189 _allowed_cascades = all_cascades
191 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
193 __slots__ = (
194 "save_update",
195 "delete",
196 "refresh_expire",
197 "merge",
198 "expunge",
199 "delete_orphan",
200 )
202 save_update: bool
203 delete: bool
204 refresh_expire: bool
205 merge: bool
206 expunge: bool
207 delete_orphan: bool
209 def __new__(
210 cls, value_list: Optional[Union[Iterable[str], str]]
211 ) -> CascadeOptions:
212 if isinstance(value_list, str) or value_list is None:
213 return cls.from_string(value_list) # type: ignore
214 values = set(value_list)
215 if values.difference(cls._allowed_cascades):
216 raise sa_exc.ArgumentError(
217 "Invalid cascade option(s): %s"
218 % ", ".join(
219 [
220 repr(x)
221 for x in sorted(
222 values.difference(cls._allowed_cascades)
223 )
224 ]
225 )
226 )
228 if "all" in values:
229 values.update(cls._add_w_all_cascades)
230 if "none" in values:
231 values.clear()
232 values.discard("all")
234 self = super().__new__(cls, values)
235 self.save_update = "save-update" in values
236 self.delete = "delete" in values
237 self.refresh_expire = "refresh-expire" in values
238 self.merge = "merge" in values
239 self.expunge = "expunge" in values
240 self.delete_orphan = "delete-orphan" in values
242 if self.delete_orphan and not self.delete:
243 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
244 return self
246 def __repr__(self):
247 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
249 @classmethod
250 def from_string(cls, arg):
251 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
252 return cls(values)
255def _validator_events(desc, key, validator, include_removes, include_backrefs):
256 """Runs a validation method on an attribute value to be set or
257 appended.
258 """
260 if not include_backrefs:
262 def detect_is_backref(state, initiator):
263 impl = state.manager[key].impl
264 return initiator.impl is not impl
266 if include_removes:
268 def append(state, value, initiator):
269 if initiator.op is not attributes.OP_BULK_REPLACE and (
270 include_backrefs or not detect_is_backref(state, initiator)
271 ):
272 return validator(state.obj(), key, value, False)
273 else:
274 return value
276 def bulk_set(state, values, initiator):
277 if include_backrefs or not detect_is_backref(state, initiator):
278 obj = state.obj()
279 values[:] = [
280 validator(obj, key, value, False) for value in values
281 ]
283 def set_(state, value, oldvalue, initiator):
284 if include_backrefs or not detect_is_backref(state, initiator):
285 return validator(state.obj(), key, value, False)
286 else:
287 return value
289 def remove(state, value, initiator):
290 if include_backrefs or not detect_is_backref(state, initiator):
291 validator(state.obj(), key, value, True)
293 else:
295 def append(state, value, initiator):
296 if initiator.op is not attributes.OP_BULK_REPLACE and (
297 include_backrefs or not detect_is_backref(state, initiator)
298 ):
299 return validator(state.obj(), key, value)
300 else:
301 return value
303 def bulk_set(state, values, initiator):
304 if include_backrefs or not detect_is_backref(state, initiator):
305 obj = state.obj()
306 values[:] = [validator(obj, key, value) for value in values]
308 def set_(state, value, oldvalue, initiator):
309 if include_backrefs or not detect_is_backref(state, initiator):
310 return validator(state.obj(), key, value)
311 else:
312 return value
314 event.listen(desc, "append", append, raw=True, retval=True)
315 event.listen(desc, "bulk_replace", bulk_set, raw=True)
316 event.listen(desc, "set", set_, raw=True, retval=True)
317 if include_removes:
318 event.listen(desc, "remove", remove, raw=True, retval=True)
321def polymorphic_union(
322 table_map, typecolname, aliasname="p_union", cast_nulls=True
323):
324 """Create a ``UNION`` statement used by a polymorphic mapper.
326 See :ref:`concrete_inheritance` for an example of how
327 this is used.
329 :param table_map: mapping of polymorphic identities to
330 :class:`_schema.Table` objects.
331 :param typecolname: string name of a "discriminator" column, which will be
332 derived from the query, producing the polymorphic identity for
333 each row. If ``None``, no polymorphic discriminator is generated.
334 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
335 construct generated.
336 :param cast_nulls: if True, non-existent columns, which are represented
337 as labeled NULLs, will be passed into CAST. This is a legacy behavior
338 that is problematic on some backends such as Oracle - in which case it
339 can be set to False.
341 """
343 colnames: util.OrderedSet[str] = util.OrderedSet()
344 colnamemaps = {}
345 types = {}
346 for key in table_map:
347 table = table_map[key]
349 table = coercions.expect(roles.FromClauseRole, table)
350 table_map[key] = table
352 m = {}
353 for c in table.c:
354 if c.key == typecolname:
355 raise sa_exc.InvalidRequestError(
356 "Polymorphic union can't use '%s' as the discriminator "
357 "column due to mapped column %r; please apply the "
358 "'typecolname' "
359 "argument; this is available on "
360 "ConcreteBase as '_concrete_discriminator_name'"
361 % (typecolname, c)
362 )
363 colnames.add(c.key)
364 m[c.key] = c
365 types[c.key] = c.type
366 colnamemaps[table] = m
368 def col(name, table):
369 try:
370 return colnamemaps[table][name]
371 except KeyError:
372 if cast_nulls:
373 return sql.cast(sql.null(), types[name]).label(name)
374 else:
375 return sql.type_coerce(sql.null(), types[name]).label(name)
377 result = []
378 for type_, table in table_map.items():
379 if typecolname is not None:
380 result.append(
381 sql.select(
382 *(
383 [col(name, table) for name in colnames]
384 + [
385 sql.literal_column(
386 sql_util._quote_ddl_expr(type_)
387 ).label(typecolname)
388 ]
389 )
390 ).select_from(table)
391 )
392 else:
393 result.append(
394 sql.select(
395 *[col(name, table) for name in colnames]
396 ).select_from(table)
397 )
398 return sql.union_all(*result).alias(aliasname)
401def identity_key(
402 class_: Optional[Type[_T]] = None,
403 ident: Union[Any, Tuple[Any, ...]] = None,
404 *,
405 instance: Optional[_T] = None,
406 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
407 identity_token: Optional[Any] = None,
408) -> _IdentityKeyType[_T]:
409 r"""Generate "identity key" tuples, as are used as keys in the
410 :attr:`.Session.identity_map` dictionary.
412 This function has several call styles:
414 * ``identity_key(class, ident, identity_token=token)``
416 This form receives a mapped class and a primary key scalar or
417 tuple as an argument.
419 E.g.::
421 >>> identity_key(MyClass, (1, 2))
422 (<class '__main__.MyClass'>, (1, 2), None)
424 :param class: mapped class (must be a positional argument)
425 :param ident: primary key, may be a scalar or tuple argument.
426 :param identity_token: optional identity token
428 * ``identity_key(instance=instance)``
430 This form will produce the identity key for a given instance. The
431 instance need not be persistent, only that its primary key attributes
432 are populated (else the key will contain ``None`` for those missing
433 values).
435 E.g.::
437 >>> instance = MyClass(1, 2)
438 >>> identity_key(instance=instance)
439 (<class '__main__.MyClass'>, (1, 2), None)
441 In this form, the given instance is ultimately run though
442 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
443 effect of performing a database check for the corresponding row
444 if the object is expired.
446 :param instance: object instance (must be given as a keyword arg)
448 * ``identity_key(class, row=row, identity_token=token)``
450 This form is similar to the class/tuple form, except is passed a
451 database result row as a :class:`.Row` or :class:`.RowMapping` object.
453 E.g.::
455 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
456 >>> identity_key(MyClass, row=row)
457 (<class '__main__.MyClass'>, (1, 2), None)
459 :param class: mapped class (must be a positional argument)
460 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
461 (must be given as a keyword arg)
462 :param identity_token: optional identity token
464 """ # noqa: E501
465 if class_ is not None:
466 mapper = class_mapper(class_)
467 if row is None:
468 if ident is None:
469 raise sa_exc.ArgumentError("ident or row is required")
470 return mapper.identity_key_from_primary_key(
471 tuple(util.to_list(ident)), identity_token=identity_token
472 )
473 else:
474 return mapper.identity_key_from_row(
475 row, identity_token=identity_token
476 )
477 elif instance is not None:
478 mapper = object_mapper(instance)
479 return mapper.identity_key_from_instance(instance)
480 else:
481 raise sa_exc.ArgumentError("class or instance is required")
484class _TraceAdaptRole(enum.Enum):
485 """Enumeration of all the use cases for ORMAdapter.
487 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
488 used for in-place adaption of column expressions to be applied to a SELECT,
489 replacing :class:`.Table` and other objects that are mapped to classes with
490 aliases of those tables in the case of joined eager loading, or in the case
491 of polymorphic loading as used with concrete mappings or other custom "with
492 polymorphic" parameters, with whole user-defined subqueries. The
493 enumerations provide an overview of all the use cases used by ORMAdapter, a
494 layer of formality as to the introduction of new ORMAdapter use cases (of
495 which none are anticipated), as well as a means to trace the origins of a
496 particular ORMAdapter within runtime debugging.
498 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
499 open-ended statement adaption, including the ``Query.with_polymorphic()``
500 method and the ``Query.select_from_entity()`` methods, favoring
501 user-explicit aliasing schemes using the ``aliased()`` and
502 ``with_polymorphic()`` standalone constructs; these still use adaption,
503 however the adaption is applied in a narrower scope.
505 """
507 # aliased() use that is used to adapt individual attributes at query
508 # construction time
509 ALIASED_INSP = enum.auto()
511 # joinedload cases; typically adapt an ON clause of a relationship
512 # join
513 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
514 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
515 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
517 # polymorphic cases - these are complex ones that replace FROM
518 # clauses, replacing tables with subqueries
519 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
520 WITH_POLYMORPHIC_ADAPTER = enum.auto()
521 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
522 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
524 # the from_statement() case, used only to adapt individual attributes
525 # from a given statement to local ORM attributes at result fetching
526 # time. assigned to ORMCompileState._from_obj_alias
527 ADAPT_FROM_STATEMENT = enum.auto()
529 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
530 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
531 # joinedloads are then placed on the outside.
532 # assigned to ORMCompileState.compound_eager_adapter
533 COMPOUND_EAGER_STATEMENT = enum.auto()
535 # the legacy Query._set_select_from() case.
536 # this is needed for Query's set operations (i.e. UNION, etc. )
537 # as well as "legacy from_self()", which while removed from 2.0 as
538 # public API, is used for the Query.count() method. this one
539 # still does full statement traversal
540 # assigned to ORMCompileState._from_obj_alias
541 LEGACY_SELECT_FROM_ALIAS = enum.auto()
544class ORMStatementAdapter(sql_util.ColumnAdapter):
545 """ColumnAdapter which includes a role attribute."""
547 __slots__ = ("role",)
549 def __init__(
550 self,
551 role: _TraceAdaptRole,
552 selectable: Selectable,
553 *,
554 equivalents: Optional[_EquivalentColumnMap] = None,
555 adapt_required: bool = False,
556 allow_label_resolve: bool = True,
557 anonymize_labels: bool = False,
558 adapt_on_names: bool = False,
559 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
560 ):
561 self.role = role
562 super().__init__(
563 selectable,
564 equivalents=equivalents,
565 adapt_required=adapt_required,
566 allow_label_resolve=allow_label_resolve,
567 anonymize_labels=anonymize_labels,
568 adapt_on_names=adapt_on_names,
569 adapt_from_selectables=adapt_from_selectables,
570 )
573class ORMAdapter(sql_util.ColumnAdapter):
574 """ColumnAdapter subclass which excludes adaptation of entities from
575 non-matching mappers.
577 """
579 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
581 is_aliased_class: bool
582 aliased_insp: Optional[AliasedInsp[Any]]
584 def __init__(
585 self,
586 role: _TraceAdaptRole,
587 entity: _InternalEntityType[Any],
588 *,
589 equivalents: Optional[_EquivalentColumnMap] = None,
590 adapt_required: bool = False,
591 allow_label_resolve: bool = True,
592 anonymize_labels: bool = False,
593 selectable: Optional[Selectable] = None,
594 limit_on_entity: bool = True,
595 adapt_on_names: bool = False,
596 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
597 ):
598 self.role = role
599 self.mapper = entity.mapper
600 if selectable is None:
601 selectable = entity.selectable
602 if insp_is_aliased_class(entity):
603 self.is_aliased_class = True
604 self.aliased_insp = entity
605 else:
606 self.is_aliased_class = False
607 self.aliased_insp = None
609 super().__init__(
610 selectable,
611 equivalents,
612 adapt_required=adapt_required,
613 allow_label_resolve=allow_label_resolve,
614 anonymize_labels=anonymize_labels,
615 include_fn=self._include_fn if limit_on_entity else None,
616 adapt_on_names=adapt_on_names,
617 adapt_from_selectables=adapt_from_selectables,
618 )
620 def _include_fn(self, elem):
621 entity = elem._annotations.get("parentmapper", None)
623 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
626class AliasedClass(
627 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
628):
629 r"""Represents an "aliased" form of a mapped class for usage with Query.
631 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
632 construct, this object mimics the mapped class using a
633 ``__getattr__`` scheme and maintains a reference to a
634 real :class:`~sqlalchemy.sql.expression.Alias` object.
636 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
637 within a SQL statement generated by the ORM, such that an existing
638 mapped entity can be used in multiple contexts. A simple example::
640 # find all pairs of users with the same name
641 user_alias = aliased(User)
642 session.query(User, user_alias).join(
643 (user_alias, User.id > user_alias.id)
644 ).filter(User.name == user_alias.name)
646 :class:`.AliasedClass` is also capable of mapping an existing mapped
647 class to an entirely new selectable, provided this selectable is column-
648 compatible with the existing mapped selectable, and it can also be
649 configured in a mapping as the target of a :func:`_orm.relationship`.
650 See the links below for examples.
652 The :class:`.AliasedClass` object is constructed typically using the
653 :func:`_orm.aliased` function. It also is produced with additional
654 configuration when using the :func:`_orm.with_polymorphic` function.
656 The resulting object is an instance of :class:`.AliasedClass`.
657 This object implements an attribute scheme which produces the
658 same attribute and method interface as the original mapped
659 class, allowing :class:`.AliasedClass` to be compatible
660 with any attribute technique which works on the original class,
661 including hybrid attributes (see :ref:`hybrids_toplevel`).
663 The :class:`.AliasedClass` can be inspected for its underlying
664 :class:`_orm.Mapper`, aliased selectable, and other information
665 using :func:`_sa.inspect`::
667 from sqlalchemy import inspect
669 my_alias = aliased(MyClass)
670 insp = inspect(my_alias)
672 The resulting inspection object is an instance of :class:`.AliasedInsp`.
675 .. seealso::
677 :func:`.aliased`
679 :func:`.with_polymorphic`
681 :ref:`relationship_aliased_class`
683 :ref:`relationship_to_window_function`
686 """
688 __name__: str
690 def __init__(
691 self,
692 mapped_class_or_ac: _EntityType[_O],
693 alias: Optional[FromClause] = None,
694 name: Optional[str] = None,
695 flat: bool = False,
696 adapt_on_names: bool = False,
697 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
698 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
699 base_alias: Optional[AliasedInsp[Any]] = None,
700 use_mapper_path: bool = False,
701 represents_outer_join: bool = False,
702 ):
703 insp = cast(
704 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
705 )
706 mapper = insp.mapper
708 nest_adapters = False
710 if alias is None:
711 if insp.is_aliased_class and insp.selectable._is_subquery:
712 alias = insp.selectable.alias()
713 else:
714 alias = (
715 mapper._with_polymorphic_selectable._anonymous_fromclause(
716 name=name,
717 flat=flat,
718 )
719 )
720 elif insp.is_aliased_class:
721 nest_adapters = True
723 assert alias is not None
724 self._aliased_insp = AliasedInsp(
725 self,
726 insp,
727 alias,
728 name,
729 (
730 with_polymorphic_mappers
731 if with_polymorphic_mappers
732 else mapper.with_polymorphic_mappers
733 ),
734 (
735 with_polymorphic_discriminator
736 if with_polymorphic_discriminator is not None
737 else mapper.polymorphic_on
738 ),
739 base_alias,
740 use_mapper_path,
741 adapt_on_names,
742 represents_outer_join,
743 nest_adapters,
744 )
746 self.__name__ = f"aliased({mapper.class_.__name__})"
748 @classmethod
749 def _reconstitute_from_aliased_insp(
750 cls, aliased_insp: AliasedInsp[_O]
751 ) -> AliasedClass[_O]:
752 obj = cls.__new__(cls)
753 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
754 obj._aliased_insp = aliased_insp
756 if aliased_insp._is_with_polymorphic:
757 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
758 if sub_aliased_insp is not aliased_insp:
759 ent = AliasedClass._reconstitute_from_aliased_insp(
760 sub_aliased_insp
761 )
762 setattr(obj, sub_aliased_insp.class_.__name__, ent)
764 return obj
766 def __getattr__(self, key: str) -> Any:
767 try:
768 _aliased_insp = self.__dict__["_aliased_insp"]
769 except KeyError:
770 raise AttributeError()
771 else:
772 target = _aliased_insp._target
773 # maintain all getattr mechanics
774 attr = getattr(target, key)
776 # attribute is a method, that will be invoked against a
777 # "self"; so just return a new method with the same function and
778 # new self
779 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
780 return types.MethodType(attr.__func__, self)
782 # attribute is a descriptor, that will be invoked against a
783 # "self"; so invoke the descriptor against this self
784 if hasattr(attr, "__get__"):
785 attr = attr.__get__(None, self)
787 # attributes within the QueryableAttribute system will want this
788 # to be invoked so the object can be adapted
789 if hasattr(attr, "adapt_to_entity"):
790 attr = attr.adapt_to_entity(_aliased_insp)
791 setattr(self, key, attr)
793 return attr
795 def _get_from_serialized(
796 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
797 ) -> Any:
798 # this method is only used in terms of the
799 # sqlalchemy.ext.serializer extension
800 attr = getattr(mapped_class, key)
801 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
802 return types.MethodType(attr.__func__, self)
804 # attribute is a descriptor, that will be invoked against a
805 # "self"; so invoke the descriptor against this self
806 if hasattr(attr, "__get__"):
807 attr = attr.__get__(None, self)
809 # attributes within the QueryableAttribute system will want this
810 # to be invoked so the object can be adapted
811 if hasattr(attr, "adapt_to_entity"):
812 aliased_insp._weak_entity = weakref.ref(self)
813 attr = attr.adapt_to_entity(aliased_insp)
814 setattr(self, key, attr)
816 return attr
818 def __repr__(self) -> str:
819 return "<AliasedClass at 0x%x; %s>" % (
820 id(self),
821 self._aliased_insp._target.__name__,
822 )
824 def __str__(self) -> str:
825 return str(self._aliased_insp)
828@inspection._self_inspects
829class AliasedInsp(
830 ORMEntityColumnsClauseRole[_O],
831 ORMFromClauseRole,
832 HasCacheKey,
833 InspectionAttr,
834 MemoizedSlots,
835 inspection.Inspectable["AliasedInsp[_O]"],
836 Generic[_O],
837):
838 """Provide an inspection interface for an
839 :class:`.AliasedClass` object.
841 The :class:`.AliasedInsp` object is returned
842 given an :class:`.AliasedClass` using the
843 :func:`_sa.inspect` function::
845 from sqlalchemy import inspect
846 from sqlalchemy.orm import aliased
848 my_alias = aliased(MyMappedClass)
849 insp = inspect(my_alias)
851 Attributes on :class:`.AliasedInsp`
852 include:
854 * ``entity`` - the :class:`.AliasedClass` represented.
855 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
856 * ``selectable`` - the :class:`_expression.Alias`
857 construct which ultimately
858 represents an aliased :class:`_schema.Table` or
859 :class:`_expression.Select`
860 construct.
861 * ``name`` - the name of the alias. Also is used as the attribute
862 name when returned in a result tuple from :class:`_query.Query`.
863 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
864 objects
865 indicating all those mappers expressed in the select construct
866 for the :class:`.AliasedClass`.
867 * ``polymorphic_on`` - an alternate column or SQL expression which
868 will be used as the "discriminator" for a polymorphic load.
870 .. seealso::
872 :ref:`inspection_toplevel`
874 """
876 __slots__ = (
877 "__weakref__",
878 "_weak_entity",
879 "mapper",
880 "selectable",
881 "name",
882 "_adapt_on_names",
883 "with_polymorphic_mappers",
884 "polymorphic_on",
885 "_use_mapper_path",
886 "_base_alias",
887 "represents_outer_join",
888 "persist_selectable",
889 "local_table",
890 "_is_with_polymorphic",
891 "_with_polymorphic_entities",
892 "_adapter",
893 "_target",
894 "__clause_element__",
895 "_memoized_values",
896 "_all_column_expressions",
897 "_nest_adapters",
898 )
900 _cache_key_traversal = [
901 ("name", visitors.ExtendedInternalTraversal.dp_string),
902 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
903 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
904 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
905 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
906 (
907 "with_polymorphic_mappers",
908 visitors.InternalTraversal.dp_has_cache_key_list,
909 ),
910 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
911 ]
913 mapper: Mapper[_O]
914 selectable: FromClause
915 _adapter: ORMAdapter
916 with_polymorphic_mappers: Sequence[Mapper[Any]]
917 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
919 _weak_entity: weakref.ref[AliasedClass[_O]]
920 """the AliasedClass that refers to this AliasedInsp"""
922 _target: Union[Type[_O], AliasedClass[_O]]
923 """the thing referenced by the AliasedClass/AliasedInsp.
925 In the vast majority of cases, this is the mapped class. However
926 it may also be another AliasedClass (alias of alias).
928 """
930 def __init__(
931 self,
932 entity: AliasedClass[_O],
933 inspected: _InternalEntityType[_O],
934 selectable: FromClause,
935 name: Optional[str],
936 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
937 polymorphic_on: Optional[ColumnElement[Any]],
938 _base_alias: Optional[AliasedInsp[Any]],
939 _use_mapper_path: bool,
940 adapt_on_names: bool,
941 represents_outer_join: bool,
942 nest_adapters: bool,
943 ):
944 mapped_class_or_ac = inspected.entity
945 mapper = inspected.mapper
947 self._weak_entity = weakref.ref(entity)
948 self.mapper = mapper
949 self.selectable = self.persist_selectable = self.local_table = (
950 selectable
951 )
952 self.name = name
953 self.polymorphic_on = polymorphic_on
954 self._base_alias = weakref.ref(_base_alias or self)
955 self._use_mapper_path = _use_mapper_path
956 self.represents_outer_join = represents_outer_join
957 self._nest_adapters = nest_adapters
959 if with_polymorphic_mappers:
960 self._is_with_polymorphic = True
961 self.with_polymorphic_mappers = with_polymorphic_mappers
962 self._with_polymorphic_entities = []
963 for poly in self.with_polymorphic_mappers:
964 if poly is not mapper:
965 ent = AliasedClass(
966 poly.class_,
967 selectable,
968 base_alias=self,
969 adapt_on_names=adapt_on_names,
970 use_mapper_path=_use_mapper_path,
971 )
973 setattr(self.entity, poly.class_.__name__, ent)
974 self._with_polymorphic_entities.append(ent._aliased_insp)
976 else:
977 self._is_with_polymorphic = False
978 self.with_polymorphic_mappers = [mapper]
980 self._adapter = ORMAdapter(
981 _TraceAdaptRole.ALIASED_INSP,
982 mapper,
983 selectable=selectable,
984 equivalents=mapper._equivalent_columns,
985 adapt_on_names=adapt_on_names,
986 anonymize_labels=True,
987 # make sure the adapter doesn't try to grab other tables that
988 # are not even the thing we are mapping, such as embedded
989 # selectables in subqueries or CTEs. See issue #6060
990 adapt_from_selectables={
991 m.selectable
992 for m in self.with_polymorphic_mappers
993 if not adapt_on_names
994 },
995 limit_on_entity=False,
996 )
998 if nest_adapters:
999 # supports "aliased class of aliased class" use case
1000 assert isinstance(inspected, AliasedInsp)
1001 self._adapter = inspected._adapter.wrap(self._adapter)
1003 self._adapt_on_names = adapt_on_names
1004 self._target = mapped_class_or_ac
1006 @classmethod
1007 def _alias_factory(
1008 cls,
1009 element: Union[_EntityType[_O], FromClause],
1010 alias: Optional[FromClause] = None,
1011 name: Optional[str] = None,
1012 flat: bool = False,
1013 adapt_on_names: bool = False,
1014 ) -> Union[AliasedClass[_O], FromClause]:
1015 if isinstance(element, FromClause):
1016 if adapt_on_names:
1017 raise sa_exc.ArgumentError(
1018 "adapt_on_names only applies to ORM elements"
1019 )
1020 if name:
1021 return element.alias(name=name, flat=flat)
1022 else:
1023 return coercions.expect(
1024 roles.AnonymizedFromClauseRole, element, flat=flat
1025 )
1026 else:
1027 return AliasedClass(
1028 element,
1029 alias=alias,
1030 flat=flat,
1031 name=name,
1032 adapt_on_names=adapt_on_names,
1033 )
1035 @classmethod
1036 def _with_polymorphic_factory(
1037 cls,
1038 base: Union[Type[_O], Mapper[_O]],
1039 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1040 selectable: Union[Literal[False, None], FromClause] = False,
1041 flat: bool = False,
1042 polymorphic_on: Optional[ColumnElement[Any]] = None,
1043 aliased: bool = False,
1044 innerjoin: bool = False,
1045 adapt_on_names: bool = False,
1046 name: Optional[str] = None,
1047 _use_mapper_path: bool = False,
1048 ) -> AliasedClass[_O]:
1049 primary_mapper = _class_to_mapper(base)
1051 if selectable not in (None, False) and flat:
1052 raise sa_exc.ArgumentError(
1053 "the 'flat' and 'selectable' arguments cannot be passed "
1054 "simultaneously to with_polymorphic()"
1055 )
1057 mappers, selectable = primary_mapper._with_polymorphic_args(
1058 classes, selectable, innerjoin=innerjoin
1059 )
1060 if aliased or flat:
1061 assert selectable is not None
1062 selectable = selectable._anonymous_fromclause(flat=flat)
1064 return AliasedClass(
1065 base,
1066 selectable,
1067 name=name,
1068 with_polymorphic_mappers=mappers,
1069 adapt_on_names=adapt_on_names,
1070 with_polymorphic_discriminator=polymorphic_on,
1071 use_mapper_path=_use_mapper_path,
1072 represents_outer_join=not innerjoin,
1073 )
1075 @property
1076 def entity(self) -> AliasedClass[_O]:
1077 # to eliminate reference cycles, the AliasedClass is held weakly.
1078 # this produces some situations where the AliasedClass gets lost,
1079 # particularly when one is created internally and only the AliasedInsp
1080 # is passed around.
1081 # to work around this case, we just generate a new one when we need
1082 # it, as it is a simple class with very little initial state on it.
1083 ent = self._weak_entity()
1084 if ent is None:
1085 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1086 self._weak_entity = weakref.ref(ent)
1087 return ent
1089 is_aliased_class = True
1090 "always returns True"
1092 def _memoized_method___clause_element__(self) -> FromClause:
1093 return self.selectable._annotate(
1094 {
1095 "parentmapper": self.mapper,
1096 "parententity": self,
1097 "entity_namespace": self,
1098 }
1099 )._set_propagate_attrs(
1100 {"compile_state_plugin": "orm", "plugin_subject": self}
1101 )
1103 @property
1104 def entity_namespace(self) -> AliasedClass[_O]:
1105 return self.entity
1107 @property
1108 def class_(self) -> Type[_O]:
1109 """Return the mapped class ultimately represented by this
1110 :class:`.AliasedInsp`."""
1111 return self.mapper.class_
1113 @property
1114 def _path_registry(self) -> _AbstractEntityRegistry:
1115 if self._use_mapper_path:
1116 return self.mapper._path_registry
1117 else:
1118 return PathRegistry.per_mapper(self)
1120 def __getstate__(self) -> Dict[str, Any]:
1121 return {
1122 "entity": self.entity,
1123 "mapper": self.mapper,
1124 "alias": self.selectable,
1125 "name": self.name,
1126 "adapt_on_names": self._adapt_on_names,
1127 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1128 "with_polymorphic_discriminator": self.polymorphic_on,
1129 "base_alias": self._base_alias(),
1130 "use_mapper_path": self._use_mapper_path,
1131 "represents_outer_join": self.represents_outer_join,
1132 "nest_adapters": self._nest_adapters,
1133 }
1135 def __setstate__(self, state: Dict[str, Any]) -> None:
1136 self.__init__( # type: ignore
1137 state["entity"],
1138 state["mapper"],
1139 state["alias"],
1140 state["name"],
1141 state["with_polymorphic_mappers"],
1142 state["with_polymorphic_discriminator"],
1143 state["base_alias"],
1144 state["use_mapper_path"],
1145 state["adapt_on_names"],
1146 state["represents_outer_join"],
1147 state["nest_adapters"],
1148 )
1150 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1151 # assert self._is_with_polymorphic
1152 # assert other._is_with_polymorphic
1154 primary_mapper = other.mapper
1156 assert self.mapper is primary_mapper
1158 our_classes = util.to_set(
1159 mp.class_ for mp in self.with_polymorphic_mappers
1160 )
1161 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1162 if our_classes == new_classes:
1163 return other
1164 else:
1165 classes = our_classes.union(new_classes)
1167 mappers, selectable = primary_mapper._with_polymorphic_args(
1168 classes, None, innerjoin=not other.represents_outer_join
1169 )
1170 selectable = selectable._anonymous_fromclause(flat=True)
1171 return AliasedClass(
1172 primary_mapper,
1173 selectable,
1174 with_polymorphic_mappers=mappers,
1175 with_polymorphic_discriminator=other.polymorphic_on,
1176 use_mapper_path=other._use_mapper_path,
1177 represents_outer_join=other.represents_outer_join,
1178 )._aliased_insp
1180 def _adapt_element(
1181 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1182 ) -> _ORMCOLEXPR:
1183 assert isinstance(expr, ColumnElement)
1184 d: Dict[str, Any] = {
1185 "parententity": self,
1186 "parentmapper": self.mapper,
1187 }
1188 if key:
1189 d["proxy_key"] = key
1191 # userspace adapt of an attribute from AliasedClass; validate that
1192 # it actually was present
1193 adapted = self._adapter.adapt_check_present(expr)
1194 if adapted is None:
1195 adapted = expr
1196 if self._adapter.adapt_on_names:
1197 util.warn_limited(
1198 "Did not locate an expression in selectable for "
1199 "attribute %r; ensure name is correct in expression",
1200 (key,),
1201 )
1202 else:
1203 util.warn_limited(
1204 "Did not locate an expression in selectable for "
1205 "attribute %r; to match by name, use the "
1206 "adapt_on_names parameter",
1207 (key,),
1208 )
1210 return adapted._annotate(d)._set_propagate_attrs(
1211 {"compile_state_plugin": "orm", "plugin_subject": self}
1212 )
1214 if TYPE_CHECKING:
1215 # establish compatibility with the _ORMAdapterProto protocol,
1216 # which in turn is compatible with _CoreAdapterProto.
1218 def _orm_adapt_element(
1219 self,
1220 obj: _CE,
1221 key: Optional[str] = None,
1222 ) -> _CE: ...
1224 else:
1225 _orm_adapt_element = _adapt_element
1227 def _entity_for_mapper(self, mapper):
1228 self_poly = self.with_polymorphic_mappers
1229 if mapper in self_poly:
1230 if mapper is self.mapper:
1231 return self
1232 else:
1233 return getattr(
1234 self.entity, mapper.class_.__name__
1235 )._aliased_insp
1236 elif mapper.isa(self.mapper):
1237 return self
1238 else:
1239 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1241 def _memoized_attr__get_clause(self):
1242 onclause, replacemap = self.mapper._get_clause
1243 return (
1244 self._adapter.traverse(onclause),
1245 {
1246 self._adapter.traverse(col): param
1247 for col, param in replacemap.items()
1248 },
1249 )
1251 def _memoized_attr__memoized_values(self):
1252 return {}
1254 def _memoized_attr__all_column_expressions(self):
1255 if self._is_with_polymorphic:
1256 cols_plus_keys = self.mapper._columns_plus_keys(
1257 [ent.mapper for ent in self._with_polymorphic_entities]
1258 )
1259 else:
1260 cols_plus_keys = self.mapper._columns_plus_keys()
1262 cols_plus_keys = [
1263 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1264 ]
1266 return ColumnCollection(cols_plus_keys)
1268 def _memo(self, key, callable_, *args, **kw):
1269 if key in self._memoized_values:
1270 return self._memoized_values[key]
1271 else:
1272 self._memoized_values[key] = value = callable_(*args, **kw)
1273 return value
1275 def __repr__(self):
1276 if self.with_polymorphic_mappers:
1277 with_poly = "(%s)" % ", ".join(
1278 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1279 )
1280 else:
1281 with_poly = ""
1282 return "<AliasedInsp at 0x%x; %s%s>" % (
1283 id(self),
1284 self.class_.__name__,
1285 with_poly,
1286 )
1288 def __str__(self):
1289 if self._is_with_polymorphic:
1290 return "with_polymorphic(%s, [%s])" % (
1291 self._target.__name__,
1292 ", ".join(
1293 mp.class_.__name__
1294 for mp in self.with_polymorphic_mappers
1295 if mp is not self.mapper
1296 ),
1297 )
1298 else:
1299 return "aliased(%s)" % (self._target.__name__,)
1302class _WrapUserEntity:
1303 """A wrapper used within the loader_criteria lambda caller so that
1304 we can bypass declared_attr descriptors on unmapped mixins, which
1305 normally emit a warning for such use.
1307 might also be useful for other per-lambda instrumentations should
1308 the need arise.
1310 """
1312 __slots__ = ("subject",)
1314 def __init__(self, subject):
1315 self.subject = subject
1317 @util.preload_module("sqlalchemy.orm.decl_api")
1318 def __getattribute__(self, name):
1319 decl_api = util.preloaded.orm.decl_api
1321 subject = object.__getattribute__(self, "subject")
1322 if name in subject.__dict__ and isinstance(
1323 subject.__dict__[name], decl_api.declared_attr
1324 ):
1325 return subject.__dict__[name].fget(subject)
1326 else:
1327 return getattr(subject, name)
1330class LoaderCriteriaOption(CriteriaOption):
1331 """Add additional WHERE criteria to the load for all occurrences of
1332 a particular entity.
1334 :class:`_orm.LoaderCriteriaOption` is invoked using the
1335 :func:`_orm.with_loader_criteria` function; see that function for
1336 details.
1338 .. versionadded:: 1.4
1340 """
1342 __slots__ = (
1343 "root_entity",
1344 "entity",
1345 "deferred_where_criteria",
1346 "where_criteria",
1347 "_where_crit_orig",
1348 "include_aliases",
1349 "propagate_to_loaders",
1350 )
1352 _traverse_internals = [
1353 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1354 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1355 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1356 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1357 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1358 ]
1360 root_entity: Optional[Type[Any]]
1361 entity: Optional[_InternalEntityType[Any]]
1362 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1363 deferred_where_criteria: bool
1364 include_aliases: bool
1365 propagate_to_loaders: bool
1367 _where_crit_orig: Any
1369 def __init__(
1370 self,
1371 entity_or_base: _EntityType[Any],
1372 where_criteria: Union[
1373 _ColumnExpressionArgument[bool],
1374 Callable[[Any], _ColumnExpressionArgument[bool]],
1375 ],
1376 loader_only: bool = False,
1377 include_aliases: bool = False,
1378 propagate_to_loaders: bool = True,
1379 track_closure_variables: bool = True,
1380 ):
1381 entity = cast(
1382 "_InternalEntityType[Any]",
1383 inspection.inspect(entity_or_base, False),
1384 )
1385 if entity is None:
1386 self.root_entity = cast("Type[Any]", entity_or_base)
1387 self.entity = None
1388 else:
1389 self.root_entity = None
1390 self.entity = entity
1392 self._where_crit_orig = where_criteria
1393 if callable(where_criteria):
1394 if self.root_entity is not None:
1395 wrap_entity = self.root_entity
1396 else:
1397 assert entity is not None
1398 wrap_entity = entity.entity
1400 self.deferred_where_criteria = True
1401 self.where_criteria = lambdas.DeferredLambdaElement(
1402 where_criteria,
1403 roles.WhereHavingRole,
1404 lambda_args=(_WrapUserEntity(wrap_entity),),
1405 opts=lambdas.LambdaOptions(
1406 track_closure_variables=track_closure_variables
1407 ),
1408 )
1409 else:
1410 self.deferred_where_criteria = False
1411 self.where_criteria = coercions.expect(
1412 roles.WhereHavingRole, where_criteria
1413 )
1415 self.include_aliases = include_aliases
1416 self.propagate_to_loaders = propagate_to_loaders
1418 @classmethod
1419 def _unreduce(
1420 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1421 ):
1422 return LoaderCriteriaOption(
1423 entity,
1424 where_criteria,
1425 include_aliases=include_aliases,
1426 propagate_to_loaders=propagate_to_loaders,
1427 )
1429 def __reduce__(self):
1430 return (
1431 LoaderCriteriaOption._unreduce,
1432 (
1433 self.entity.class_ if self.entity else self.root_entity,
1434 self._where_crit_orig,
1435 self.include_aliases,
1436 self.propagate_to_loaders,
1437 ),
1438 )
1440 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1441 if self.entity:
1442 yield from self.entity.mapper.self_and_descendants
1443 else:
1444 assert self.root_entity
1445 stack = list(self.root_entity.__subclasses__())
1446 while stack:
1447 subclass = stack.pop(0)
1448 ent = cast(
1449 "_InternalEntityType[Any]",
1450 inspection.inspect(subclass, raiseerr=False),
1451 )
1452 if ent:
1453 yield from ent.mapper.self_and_descendants
1454 else:
1455 stack.extend(subclass.__subclasses__())
1457 def _should_include(self, compile_state: _ORMCompileState) -> bool:
1458 if (
1459 compile_state.select_statement._annotations.get(
1460 "for_loader_criteria", None
1461 )
1462 is self
1463 ):
1464 return False
1465 return True
1467 def _resolve_where_criteria(
1468 self, ext_info: _InternalEntityType[Any]
1469 ) -> ColumnElement[bool]:
1470 if self.deferred_where_criteria:
1471 crit = cast(
1472 "ColumnElement[bool]",
1473 self.where_criteria._resolve_with_args(ext_info.entity),
1474 )
1475 else:
1476 crit = self.where_criteria # type: ignore
1477 assert isinstance(crit, ColumnElement)
1478 return sql_util._deep_annotate(
1479 crit,
1480 {"for_loader_criteria": self},
1481 detect_subquery_cols=True,
1482 ind_cols_on_fromclause=True,
1483 )
1485 def process_compile_state_replaced_entities(
1486 self,
1487 compile_state: _ORMCompileState,
1488 mapper_entities: Iterable[_MapperEntity],
1489 ) -> None:
1490 self.process_compile_state(compile_state)
1492 def process_compile_state(self, compile_state: _ORMCompileState) -> None:
1493 """Apply a modification to a given :class:`.CompileState`."""
1495 # if options to limit the criteria to immediate query only,
1496 # use compile_state.attributes instead
1498 self.get_global_criteria(compile_state.global_attributes)
1500 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1501 for mp in self._all_mappers():
1502 load_criteria = attributes.setdefault(
1503 ("additional_entity_criteria", mp), []
1504 )
1506 load_criteria.append(self)
1509inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1512@inspection._inspects(type)
1513def _inspect_mc(
1514 class_: Type[_O],
1515) -> Optional[Mapper[_O]]:
1516 try:
1517 class_manager = opt_manager_of_class(class_)
1518 if class_manager is None or not class_manager.is_mapped:
1519 return None
1520 mapper = class_manager.mapper
1521 except orm_exc.NO_STATE:
1522 return None
1523 else:
1524 return mapper
1527GenericAlias = type(List[Any])
1530@inspection._inspects(GenericAlias)
1531def _inspect_generic_alias(
1532 class_: Type[_O],
1533) -> Optional[Mapper[_O]]:
1534 origin = cast("Type[_O]", get_origin(class_))
1535 return _inspect_mc(origin)
1538@inspection._self_inspects
1539class Bundle(
1540 ORMColumnsClauseRole[_T],
1541 SupportsCloneAnnotations,
1542 MemoizedHasCacheKey,
1543 inspection.Inspectable["Bundle[_T]"],
1544 InspectionAttr,
1545):
1546 """A grouping of SQL expressions that are returned by a :class:`.Query`
1547 under one namespace.
1549 The :class:`.Bundle` essentially allows nesting of the tuple-based
1550 results returned by a column-oriented :class:`_query.Query` object.
1551 It also
1552 is extensible via simple subclassing, where the primary capability
1553 to override is that of how the set of expressions should be returned,
1554 allowing post-processing as well as custom return types, without
1555 involving ORM identity-mapped classes.
1557 .. seealso::
1559 :ref:`bundles`
1561 :class:`.DictBundle`
1563 """
1565 single_entity = False
1566 """If True, queries for a single Bundle will be returned as a single
1567 entity, rather than an element within a keyed tuple."""
1569 is_clause_element = False
1571 is_mapper = False
1573 is_aliased_class = False
1575 is_bundle = True
1577 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1579 proxy_set = util.EMPTY_SET
1581 exprs: List[_ColumnsClauseElement]
1583 def __init__(
1584 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1585 ) -> None:
1586 r"""Construct a new :class:`.Bundle`.
1588 e.g.::
1590 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1592 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1593 print(row.mybundle.x, row.mybundle.y)
1595 :param name: name of the bundle.
1596 :param \*exprs: columns or SQL expressions comprising the bundle.
1597 :param single_entity=False: if True, rows for this :class:`.Bundle`
1598 can be returned as a "single entity" outside of any enclosing tuple
1599 in the same manner as a mapped entity.
1601 """ # noqa: E501
1602 self.name = self._label = name
1603 coerced_exprs = [
1604 coercions.expect(
1605 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1606 )
1607 for expr in exprs
1608 ]
1609 self.exprs = coerced_exprs
1611 self.c = self.columns = ColumnCollection(
1612 (getattr(col, "key", col._label), col)
1613 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1614 ).as_readonly()
1615 self.single_entity = kw.pop("single_entity", self.single_entity)
1617 def _gen_cache_key(
1618 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1619 ) -> Tuple[Any, ...]:
1620 return (self.__class__, self.name, self.single_entity) + tuple(
1621 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1622 )
1624 @property
1625 def mapper(self) -> Optional[Mapper[Any]]:
1626 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1627 "parentmapper", None
1628 )
1629 return mp
1631 @property
1632 def entity(self) -> Optional[_InternalEntityType[Any]]:
1633 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1634 0
1635 ]._annotations.get("parententity", None)
1636 return ie
1638 @property
1639 def entity_namespace(
1640 self,
1641 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1642 return self.c
1644 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1646 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1648 e.g.::
1650 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1652 q = sess.query(bn).filter(bn.c.x == 5)
1654 Nesting of bundles is also supported::
1656 b1 = Bundle(
1657 "b1",
1658 Bundle("b2", MyClass.a, MyClass.b),
1659 Bundle("b3", MyClass.x, MyClass.y),
1660 )
1662 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1664 .. seealso::
1666 :attr:`.Bundle.c`
1668 """ # noqa: E501
1670 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1671 """An alias for :attr:`.Bundle.columns`."""
1673 def _clone(self, **kw):
1674 cloned = self.__class__.__new__(self.__class__)
1675 cloned.__dict__.update(self.__dict__)
1676 return cloned
1678 def __clause_element__(self):
1679 # ensure existing entity_namespace remains
1680 annotations = {"bundle": self, "entity_namespace": self}
1681 annotations.update(self._annotations)
1683 plugin_subject = self.exprs[0]._propagate_attrs.get(
1684 "plugin_subject", self.entity
1685 )
1686 return (
1687 expression.ClauseList(
1688 _literal_as_text_role=roles.ColumnsClauseRole,
1689 group=False,
1690 *[e._annotations.get("bundle", e) for e in self.exprs],
1691 )
1692 ._annotate(annotations)
1693 ._set_propagate_attrs(
1694 # the Bundle *must* use the orm plugin no matter what. the
1695 # subject can be None but it's much better if it's not.
1696 {
1697 "compile_state_plugin": "orm",
1698 "plugin_subject": plugin_subject,
1699 }
1700 )
1701 )
1703 @property
1704 def clauses(self):
1705 return self.__clause_element__().clauses
1707 def label(self, name):
1708 """Provide a copy of this :class:`.Bundle` passing a new label."""
1710 cloned = self._clone()
1711 cloned.name = name
1712 return cloned
1714 def create_row_processor(
1715 self,
1716 query: Select[Unpack[TupleAny]],
1717 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1718 labels: Sequence[str],
1719 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1720 """Produce the "row processing" function for this :class:`.Bundle`.
1722 May be overridden by subclasses to provide custom behaviors when
1723 results are fetched. The method is passed the statement object and a
1724 set of "row processor" functions at query execution time; these
1725 processor functions when given a result row will return the individual
1726 attribute value, which can then be adapted into any kind of return data
1727 structure.
1729 The example below illustrates replacing the usual :class:`.Row`
1730 return structure with a straight Python dictionary::
1732 from sqlalchemy.orm import Bundle
1735 class DictBundle(Bundle):
1736 def create_row_processor(self, query, procs, labels):
1737 "Override create_row_processor to return values as dictionaries"
1739 def proc(row):
1740 return dict(zip(labels, (proc(row) for proc in procs)))
1742 return proc
1744 A result from the above :class:`_orm.Bundle` will return dictionary
1745 values::
1747 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1748 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1749 print(row.mybundle["data1"], row.mybundle["data2"])
1751 The above example is available natively using :class:`.DictBundle`
1753 .. seealso::
1755 :class:`.DictBundle`
1757 """ # noqa: E501
1758 keyed_tuple = result_tuple(labels, [() for l in labels])
1760 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1761 return keyed_tuple([proc(row) for proc in procs])
1763 return proc
1766class DictBundle(Bundle[_T]):
1767 """Like :class:`.Bundle` but returns ``dict`` instances instead of
1768 named tuple like objects::
1770 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1771 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1772 print(row.mybundle["data1"], row.mybundle["data2"])
1774 Differently from :class:`.Bundle`, multiple columns with the same name are
1775 not supported.
1777 .. versionadded:: 2.1
1779 .. seealso::
1781 :ref:`bundles`
1783 :class:`.Bundle`
1784 """
1786 def __init__(
1787 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1788 ) -> None:
1789 super().__init__(name, *exprs, **kw)
1790 if len(set(self.c.keys())) != len(self.c):
1791 raise sa_exc.ArgumentError(
1792 "DictBundle does not support duplicate column names"
1793 )
1795 def create_row_processor(
1796 self,
1797 query: Select[Unpack[TupleAny]],
1798 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1799 labels: Sequence[str],
1800 ) -> Callable[[Row[Unpack[TupleAny]]], dict[str, Any]]:
1801 def proc(row: Row[Unpack[TupleAny]]) -> dict[str, Any]:
1802 return dict(zip(labels, (proc(row) for proc in procs)))
1804 return proc
1807def _orm_full_deannotate(element: _SA) -> _SA:
1808 return sql_util._deep_deannotate(element)
1811class _ORMJoin(expression.Join):
1812 """Extend Join to support ORM constructs as input."""
1814 __visit_name__ = expression.Join.__visit_name__
1816 inherit_cache = True
1818 def __init__(
1819 self,
1820 left: _FromClauseArgument,
1821 right: _FromClauseArgument,
1822 onclause: Optional[_OnClauseArgument] = None,
1823 isouter: bool = False,
1824 full: bool = False,
1825 _left_memo: Optional[Any] = None,
1826 _right_memo: Optional[Any] = None,
1827 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1828 ):
1829 left_info = cast(
1830 "Union[FromClause, _InternalEntityType[Any]]",
1831 inspection.inspect(left),
1832 )
1834 right_info = cast(
1835 "Union[FromClause, _InternalEntityType[Any]]",
1836 inspection.inspect(right),
1837 )
1838 adapt_to = right_info.selectable
1840 # used by joined eager loader
1841 self._left_memo = _left_memo
1842 self._right_memo = _right_memo
1844 if isinstance(onclause, attributes.QueryableAttribute):
1845 if TYPE_CHECKING:
1846 assert isinstance(
1847 onclause.comparator, RelationshipProperty.Comparator
1848 )
1849 on_selectable = onclause.comparator._source_selectable()
1850 prop = onclause.property
1851 _extra_criteria += onclause._extra_criteria
1852 elif isinstance(onclause, MapperProperty):
1853 # used internally by joined eager loader...possibly not ideal
1854 prop = onclause
1855 on_selectable = prop.parent.selectable
1856 else:
1857 prop = None
1858 on_selectable = None
1860 left_selectable = left_info.selectable
1861 if prop:
1862 adapt_from: Optional[FromClause]
1863 if sql_util.clause_is_present(on_selectable, left_selectable):
1864 adapt_from = on_selectable
1865 else:
1866 assert isinstance(left_selectable, FromClause)
1867 adapt_from = left_selectable
1869 (
1870 pj,
1871 sj,
1872 source,
1873 dest,
1874 secondary,
1875 target_adapter,
1876 ) = prop._create_joins(
1877 source_selectable=adapt_from,
1878 dest_selectable=adapt_to,
1879 source_polymorphic=True,
1880 of_type_entity=right_info,
1881 alias_secondary=True,
1882 extra_criteria=_extra_criteria,
1883 )
1885 if sj is not None:
1886 if isouter:
1887 # note this is an inner join from secondary->right
1888 right = sql.join(secondary, right, sj)
1889 onclause = pj
1890 else:
1891 left = sql.join(left, secondary, pj, isouter)
1892 onclause = sj
1893 else:
1894 onclause = pj
1896 self._target_adapter = target_adapter
1898 # we don't use the normal coercions logic for _ORMJoin
1899 # (probably should), so do some gymnastics to get the entity.
1900 # logic here is for #8721, which was a major bug in 1.4
1901 # for almost two years, not reported/fixed until 1.4.43 (!)
1902 if is_selectable(left_info):
1903 parententity = left_selectable._annotations.get(
1904 "parententity", None
1905 )
1906 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1907 parententity = left_info
1908 else:
1909 parententity = None
1911 if parententity is not None:
1912 self._annotations = self._annotations.union(
1913 {"parententity": parententity}
1914 )
1916 augment_onclause = bool(_extra_criteria) and not prop
1917 expression.Join.__init__(self, left, right, onclause, isouter, full)
1919 assert self.onclause is not None
1921 if augment_onclause:
1922 self.onclause &= sql.and_(*_extra_criteria)
1924 if (
1925 not prop
1926 and getattr(right_info, "mapper", None)
1927 and right_info.mapper.single # type: ignore
1928 ):
1929 right_info = cast("_InternalEntityType[Any]", right_info)
1930 # if single inheritance target and we are using a manual
1931 # or implicit ON clause, augment it the same way we'd augment the
1932 # WHERE.
1933 single_crit = right_info.mapper._single_table_criterion
1934 if single_crit is not None:
1935 if insp_is_aliased_class(right_info):
1936 single_crit = right_info._adapter.traverse(single_crit)
1937 self.onclause = self.onclause & single_crit
1939 def _splice_into_center(self, other):
1940 """Splice a join into the center.
1942 Given join(a, b) and join(b, c), return join(a, b).join(c)
1944 """
1945 leftmost = other
1946 while isinstance(leftmost, sql.Join):
1947 leftmost = leftmost.left
1949 assert self.right is leftmost
1951 left = _ORMJoin(
1952 self.left,
1953 other.left,
1954 self.onclause,
1955 isouter=self.isouter,
1956 _left_memo=self._left_memo,
1957 _right_memo=other._left_memo._path_registry,
1958 )
1960 return _ORMJoin(
1961 left,
1962 other.right,
1963 other.onclause,
1964 isouter=other.isouter,
1965 _right_memo=other._right_memo,
1966 )
1968 def join(
1969 self,
1970 right: _FromClauseArgument,
1971 onclause: Optional[_OnClauseArgument] = None,
1972 isouter: bool = False,
1973 full: bool = False,
1974 ) -> _ORMJoin:
1975 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1977 def outerjoin(
1978 self,
1979 right: _FromClauseArgument,
1980 onclause: Optional[_OnClauseArgument] = None,
1981 full: bool = False,
1982 ) -> _ORMJoin:
1983 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1986def with_parent(
1987 instance: object,
1988 prop: attributes.QueryableAttribute[Any],
1989 from_entity: Optional[_EntityType[Any]] = None,
1990) -> ColumnElement[bool]:
1991 """Create filtering criterion that relates this query's primary entity
1992 to the given related instance, using established
1993 :func:`_orm.relationship()`
1994 configuration.
1996 E.g.::
1998 stmt = select(Address).where(with_parent(some_user, User.addresses))
2000 The SQL rendered is the same as that rendered when a lazy loader
2001 would fire off from the given parent on that attribute, meaning
2002 that the appropriate state is taken from the parent object in
2003 Python without the need to render joins to the parent table
2004 in the rendered statement.
2006 The given property may also make use of :meth:`_orm.PropComparator.of_type`
2007 to indicate the left side of the criteria::
2010 a1 = aliased(Address)
2011 a2 = aliased(Address)
2012 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
2014 The above use is equivalent to using the
2015 :func:`_orm.with_parent.from_entity` argument::
2017 a1 = aliased(Address)
2018 a2 = aliased(Address)
2019 stmt = select(a1, a2).where(
2020 with_parent(u1, User.addresses, from_entity=a2)
2021 )
2023 :param instance:
2024 An instance which has some :func:`_orm.relationship`.
2026 :param property:
2027 Class-bound attribute, which indicates
2028 what relationship from the instance should be used to reconcile the
2029 parent/child relationship.
2031 :param from_entity:
2032 Entity in which to consider as the left side. This defaults to the
2033 "zero" entity of the :class:`_query.Query` itself.
2035 """ # noqa: E501
2036 prop_t: RelationshipProperty[Any]
2038 if isinstance(prop, str):
2039 raise sa_exc.ArgumentError(
2040 "with_parent() accepts class-bound mapped attributes, not strings"
2041 )
2042 elif isinstance(prop, attributes.QueryableAttribute):
2043 if prop._of_type:
2044 from_entity = prop._of_type
2045 mapper_property = prop.property
2046 if mapper_property is None or not prop_is_relationship(
2047 mapper_property
2048 ):
2049 raise sa_exc.ArgumentError(
2050 f"Expected relationship property for with_parent(), "
2051 f"got {mapper_property}"
2052 )
2053 prop_t = mapper_property
2054 else:
2055 prop_t = prop
2057 return prop_t._with_parent(instance, from_entity=from_entity)
2060def has_identity(object_: object) -> bool:
2061 """Return True if the given object has a database
2062 identity.
2064 This typically corresponds to the object being
2065 in either the persistent or detached state.
2067 .. seealso::
2069 :func:`.was_deleted`
2071 """
2072 state = attributes.instance_state(object_)
2073 return state.has_identity
2076def was_deleted(object_: object) -> bool:
2077 """Return True if the given object was deleted
2078 within a session flush.
2080 This is regardless of whether or not the object is
2081 persistent or detached.
2083 .. seealso::
2085 :attr:`.InstanceState.was_deleted`
2087 """
2089 state = attributes.instance_state(object_)
2090 return state.was_deleted
2093def _entity_corresponds_to(
2094 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2095) -> bool:
2096 """determine if 'given' corresponds to 'entity', in terms
2097 of an entity passed to Query that would match the same entity
2098 being referred to elsewhere in the query.
2100 """
2101 if insp_is_aliased_class(entity):
2102 if insp_is_aliased_class(given):
2103 if entity._base_alias() is given._base_alias():
2104 return True
2105 return False
2106 elif insp_is_aliased_class(given):
2107 if given._use_mapper_path:
2108 return entity in given.with_polymorphic_mappers
2109 else:
2110 return entity is given
2112 assert insp_is_mapper(given)
2113 return entity.common_parent(given)
2116def _entity_corresponds_to_use_path_impl(
2117 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2118) -> bool:
2119 """determine if 'given' corresponds to 'entity', in terms
2120 of a path of loader options where a mapped attribute is taken to
2121 be a member of a parent entity.
2123 e.g.::
2125 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2126 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2128 a1 = aliased(A)
2129 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2130 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2132 wp = with_polymorphic(A, [A1, A2])
2133 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2134 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2136 """
2137 if insp_is_aliased_class(given):
2138 return (
2139 insp_is_aliased_class(entity)
2140 and not entity._use_mapper_path
2141 and (given is entity or entity in given._with_polymorphic_entities)
2142 )
2143 elif not insp_is_aliased_class(entity):
2144 return given.isa(entity.mapper)
2145 else:
2146 return (
2147 entity._use_mapper_path
2148 and given in entity.with_polymorphic_mappers
2149 )
2152def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2153 """determine if 'given' "is a" mapper, in terms of the given
2154 would load rows of type 'mapper'.
2156 """
2157 if given.is_aliased_class:
2158 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2159 mapper
2160 )
2161 elif given.with_polymorphic_mappers:
2162 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2163 else:
2164 return given.isa(mapper)
2167def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2168 """calculate __getitem__ in terms of an iterable query object
2169 that also has a slice() method.
2171 """
2173 def _no_negative_indexes():
2174 raise IndexError(
2175 "negative indexes are not accepted by SQL "
2176 "index / slice operators"
2177 )
2179 if isinstance(item, slice):
2180 start, stop, step = util.decode_slice(item)
2182 if (
2183 isinstance(stop, int)
2184 and isinstance(start, int)
2185 and stop - start <= 0
2186 ):
2187 return []
2189 elif (isinstance(start, int) and start < 0) or (
2190 isinstance(stop, int) and stop < 0
2191 ):
2192 _no_negative_indexes()
2194 res = iterable_query.slice(start, stop)
2195 if step is not None:
2196 return list(res)[None : None : item.step]
2197 else:
2198 return list(res)
2199 else:
2200 if item == -1:
2201 _no_negative_indexes()
2202 else:
2203 return list(iterable_query[item : item + 1])[0]
2206def _is_mapped_annotation(
2207 raw_annotation: _AnnotationScanType,
2208 cls: Type[Any],
2209 originating_cls: Type[Any],
2210) -> bool:
2211 try:
2212 annotated = de_stringify_annotation(
2213 cls, raw_annotation, originating_cls.__module__
2214 )
2215 except NameError:
2216 # in most cases, at least within our own tests, we can raise
2217 # here, which is more accurate as it prevents us from returning
2218 # false negatives. However, in the real world, try to avoid getting
2219 # involved with end-user annotations that have nothing to do with us.
2220 # see issue #8888 where we bypass using this function in the case
2221 # that we want to detect an unresolvable Mapped[] type.
2222 return False
2223 else:
2224 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2227class _CleanupError(Exception):
2228 pass
2231def _cleanup_mapped_str_annotation(
2232 annotation: str, originating_module: str
2233) -> str:
2234 # fix up an annotation that comes in as the form:
2235 # 'Mapped[List[Address]]' so that it instead looks like:
2236 # 'Mapped[List["Address"]]' , which will allow us to get
2237 # "Address" as a string
2239 # additionally, resolve symbols for these names since this is where
2240 # we'd have to do it
2242 inner: Optional[Match[str]]
2244 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2246 if not mm:
2247 return annotation
2249 # ticket #8759. Resolve the Mapped name to a real symbol.
2250 # originally this just checked the name.
2251 try:
2252 obj = eval_name_only(mm.group(1), originating_module)
2253 except NameError as ne:
2254 raise _CleanupError(
2255 f'For annotation "{annotation}", could not resolve '
2256 f'container type "{mm.group(1)}". '
2257 "Please ensure this type is imported at the module level "
2258 "outside of TYPE_CHECKING blocks"
2259 ) from ne
2261 if obj is typing.ClassVar:
2262 real_symbol = "ClassVar"
2263 else:
2264 try:
2265 if issubclass(obj, _MappedAnnotationBase):
2266 real_symbol = obj.__name__
2267 else:
2268 return annotation
2269 except TypeError:
2270 # avoid isinstance(obj, type) check, just catch TypeError
2271 return annotation
2273 # note: if one of the codepaths above didn't define real_symbol and
2274 # then didn't return, real_symbol raises UnboundLocalError
2275 # which is actually a NameError, and the calling routines don't
2276 # notice this since they are catching NameError anyway. Just in case
2277 # this is being modified in the future, something to be aware of.
2279 stack = []
2280 inner = mm
2281 while True:
2282 stack.append(real_symbol if mm is inner else inner.group(1))
2283 g2 = inner.group(2)
2284 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2285 if inner is None:
2286 stack.append(g2)
2287 break
2289 # stacks we want to rewrite, that is, quote the last entry which
2290 # we think is a relationship class name:
2291 #
2292 # ['Mapped', 'List', 'Address']
2293 # ['Mapped', 'A']
2294 #
2295 # stacks we dont want to rewrite, which are generally MappedColumn
2296 # use cases:
2297 #
2298 # ['Mapped', "'Optional[Dict[str, str]]'"]
2299 # ['Mapped', 'dict[str, str] | None']
2301 if (
2302 # avoid already quoted symbols such as
2303 # ['Mapped', "'Optional[Dict[str, str]]'"]
2304 not re.match(r"""^["'].*["']$""", stack[-1])
2305 # avoid further generics like Dict[] such as
2306 # ['Mapped', 'dict[str, str] | None'],
2307 # ['Mapped', 'list[int] | list[str]'],
2308 # ['Mapped', 'Union[list[int], list[str]]'],
2309 and not re.search(r"[\[\]]", stack[-1])
2310 ):
2311 stripchars = "\"' "
2312 stack[-1] = ", ".join(
2313 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2314 )
2316 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2318 return annotation
2321def _extract_mapped_subtype(
2322 raw_annotation: Optional[_AnnotationScanType],
2323 cls: type,
2324 originating_module: str,
2325 key: str,
2326 attr_cls: Type[Any],
2327 required: bool,
2328 is_dataclass_field: bool,
2329 expect_mapped: bool = True,
2330 raiseerr: bool = True,
2331) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2332 """given an annotation, figure out if it's ``Mapped[something]`` and if
2333 so, return the ``something`` part.
2335 Includes error raise scenarios and other options.
2337 """
2339 if raw_annotation is None:
2340 if required:
2341 raise orm_exc.MappedAnnotationError(
2342 f"Python typing annotation is required for attribute "
2343 f'"{cls.__name__}.{key}" when primary argument(s) for '
2344 f'"{attr_cls.__name__}" construct are None or not present'
2345 )
2346 return None
2348 try:
2349 # destringify the "outside" of the annotation. note we are not
2350 # adding include_generic so it will *not* dig into generic contents,
2351 # which will remain as ForwardRef or plain str under future annotations
2352 # mode. The full destringify happens later when mapped_column goes
2353 # to do a full lookup in the registry type_annotations_map.
2354 annotated = de_stringify_annotation(
2355 cls,
2356 raw_annotation,
2357 originating_module,
2358 str_cleanup_fn=_cleanup_mapped_str_annotation,
2359 )
2360 except _CleanupError as ce:
2361 raise orm_exc.MappedAnnotationError(
2362 f"Could not interpret annotation {raw_annotation}. "
2363 "Check that it uses names that are correctly imported at the "
2364 "module level. See chained stack trace for more hints."
2365 ) from ce
2366 except NameError as ne:
2367 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2368 raise orm_exc.MappedAnnotationError(
2369 f"Could not interpret annotation {raw_annotation}. "
2370 "Check that it uses names that are correctly imported at the "
2371 "module level. See chained stack trace for more hints."
2372 ) from ne
2374 annotated = raw_annotation # type: ignore
2376 if is_dataclass_field:
2377 return annotated, None
2378 else:
2379 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2380 annotated, _MappedAnnotationBase
2381 ):
2382 if expect_mapped:
2383 if not raiseerr:
2384 return None
2386 origin = getattr(annotated, "__origin__", None)
2387 if origin is typing.ClassVar:
2388 return None
2390 # check for other kind of ORM descriptor like AssociationProxy,
2391 # don't raise for that (issue #9957)
2392 elif isinstance(origin, type) and issubclass(
2393 origin, ORMDescriptor
2394 ):
2395 return None
2397 raise orm_exc.MappedAnnotationError(
2398 f'Type annotation for "{cls.__name__}.{key}" '
2399 "can't be correctly interpreted for "
2400 "Annotated Declarative Table form. ORM annotations "
2401 "should normally make use of the ``Mapped[]`` generic "
2402 "type, or other ORM-compatible generic type, as a "
2403 "container for the actual type, which indicates the "
2404 "intent that the attribute is mapped. "
2405 "Class variables that are not intended to be mapped "
2406 "by the ORM should use ClassVar[]. "
2407 "To allow Annotated Declarative to disregard legacy "
2408 "annotations which don't use Mapped[] to pass, set "
2409 '"__allow_unmapped__ = True" on the class or a '
2410 "superclass this class.",
2411 code="zlpr",
2412 )
2414 else:
2415 return annotated, None
2417 generic_annotated = cast(GenericProtocol[Any], annotated)
2418 if len(generic_annotated.__args__) != 1:
2419 raise orm_exc.MappedAnnotationError(
2420 "Expected sub-type for Mapped[] annotation"
2421 )
2423 return (
2424 # fix dict/list/set args to be ForwardRef, see #11814
2425 fixup_container_fwd_refs(generic_annotated.__args__[0]),
2426 generic_annotated.__origin__,
2427 )
2430def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2431 if hasattr(prop, "_mapper_property_name"):
2432 name = prop._mapper_property_name()
2433 else:
2434 name = None
2435 return util.clsname_as_plain_name(prop, name)