Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/util.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# orm/util.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9from __future__ import annotations
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Protocol
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35import weakref
37from . import attributes # noqa
38from . import exc
39from ._typing import _O
40from ._typing import insp_is_aliased_class
41from ._typing import insp_is_mapper
42from ._typing import prop_is_relationship
43from .base import _class_to_mapper as _class_to_mapper
44from .base import _MappedAnnotationBase
45from .base import _never_set as _never_set # noqa: F401
46from .base import _none_set as _none_set # noqa: F401
47from .base import attribute_str as attribute_str # noqa: F401
48from .base import class_mapper as class_mapper
49from .base import DynamicMapped
50from .base import InspectionAttr as InspectionAttr
51from .base import instance_str as instance_str # noqa: F401
52from .base import Mapped
53from .base import object_mapper as object_mapper
54from .base import object_state as object_state # noqa: F401
55from .base import opt_manager_of_class
56from .base import ORMDescriptor
57from .base import state_attribute_str as state_attribute_str # noqa: F401
58from .base import state_class_str as state_class_str # noqa: F401
59from .base import state_str as state_str # noqa: F401
60from .base import WriteOnlyMapped
61from .interfaces import CriteriaOption
62from .interfaces import MapperProperty as MapperProperty
63from .interfaces import ORMColumnsClauseRole
64from .interfaces import ORMEntityColumnsClauseRole
65from .interfaces import ORMFromClauseRole
66from .path_registry import PathRegistry as PathRegistry
67from .. import event
68from .. import exc as sa_exc
69from .. import inspection
70from .. import sql
71from .. import util
72from ..engine.result import result_tuple
73from ..sql import coercions
74from ..sql import expression
75from ..sql import lambdas
76from ..sql import roles
77from ..sql import util as sql_util
78from ..sql import visitors
79from ..sql._typing import is_selectable
80from ..sql.annotation import SupportsCloneAnnotations
81from ..sql.base import ColumnCollection
82from ..sql.cache_key import HasCacheKey
83from ..sql.cache_key import MemoizedHasCacheKey
84from ..sql.elements import ColumnElement
85from ..sql.elements import KeyedColumnElement
86from ..sql.selectable import FromClause
87from ..util.langhelpers import MemoizedSlots
88from ..util.typing import de_stringify_annotation as _de_stringify_annotation
89from ..util.typing import (
90 de_stringify_union_elements as _de_stringify_union_elements,
91)
92from ..util.typing import eval_name_only as _eval_name_only
93from ..util.typing import fixup_container_fwd_refs
94from ..util.typing import is_origin_of_cls
95from ..util.typing import Literal
96from ..util.typing import TupleAny
97from ..util.typing import typing_get_origin
98from ..util.typing import Unpack
100if typing.TYPE_CHECKING:
101 from ._typing import _EntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InternalEntityType
104 from ._typing import _ORMCOLEXPR
105 from .context import _MapperEntity
106 from .context import ORMCompileState
107 from .mapper import Mapper
108 from .path_registry import AbstractEntityRegistry
109 from .query import Query
110 from .relationships import RelationshipProperty
111 from ..engine import Row
112 from ..engine import RowMapping
113 from ..sql._typing import _CE
114 from ..sql._typing import _ColumnExpressionArgument
115 from ..sql._typing import _EquivalentColumnMap
116 from ..sql._typing import _FromClauseArgument
117 from ..sql._typing import _OnClauseArgument
118 from ..sql._typing import _PropagateAttrsType
119 from ..sql.annotation import _SA
120 from ..sql.base import ReadOnlyColumnCollection
121 from ..sql.elements import BindParameter
122 from ..sql.selectable import _ColumnsClauseElement
123 from ..sql.selectable import Select
124 from ..sql.selectable import Selectable
125 from ..sql.visitors import anon_map
126 from ..util.typing import _AnnotationScanType
127 from ..util.typing import ArgsTypeProcotol
129_T = TypeVar("_T", bound=Any)
131all_cascades = frozenset(
132 (
133 "delete",
134 "delete-orphan",
135 "all",
136 "merge",
137 "expunge",
138 "save-update",
139 "refresh-expire",
140 "none",
141 )
142)
145_de_stringify_partial = functools.partial(
146 functools.partial,
147 locals_=util.immutabledict(
148 {
149 "Mapped": Mapped,
150 "WriteOnlyMapped": WriteOnlyMapped,
151 "DynamicMapped": DynamicMapped,
152 }
153 ),
154)
156# partial is practically useless as we have to write out the whole
157# function and maintain the signature anyway
160class _DeStringifyAnnotation(Protocol):
161 def __call__(
162 self,
163 cls: Type[Any],
164 annotation: _AnnotationScanType,
165 originating_module: str,
166 *,
167 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
168 include_generic: bool = False,
169 ) -> Type[Any]: ...
172de_stringify_annotation = cast(
173 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
174)
177class _DeStringifyUnionElements(Protocol):
178 def __call__(
179 self,
180 cls: Type[Any],
181 annotation: ArgsTypeProcotol,
182 originating_module: str,
183 *,
184 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
185 ) -> Type[Any]: ...
188de_stringify_union_elements = cast(
189 _DeStringifyUnionElements,
190 _de_stringify_partial(_de_stringify_union_elements),
191)
194class _EvalNameOnly(Protocol):
195 def __call__(self, name: str, module_name: str) -> Any: ...
198eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
201class CascadeOptions(FrozenSet[str]):
202 """Keeps track of the options sent to
203 :paramref:`.relationship.cascade`"""
205 _add_w_all_cascades = all_cascades.difference(
206 ["all", "none", "delete-orphan"]
207 )
208 _allowed_cascades = all_cascades
210 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
212 __slots__ = (
213 "save_update",
214 "delete",
215 "refresh_expire",
216 "merge",
217 "expunge",
218 "delete_orphan",
219 )
221 save_update: bool
222 delete: bool
223 refresh_expire: bool
224 merge: bool
225 expunge: bool
226 delete_orphan: bool
228 def __new__(
229 cls, value_list: Optional[Union[Iterable[str], str]]
230 ) -> CascadeOptions:
231 if isinstance(value_list, str) or value_list is None:
232 return cls.from_string(value_list) # type: ignore
233 values = set(value_list)
234 if values.difference(cls._allowed_cascades):
235 raise sa_exc.ArgumentError(
236 "Invalid cascade option(s): %s"
237 % ", ".join(
238 [
239 repr(x)
240 for x in sorted(
241 values.difference(cls._allowed_cascades)
242 )
243 ]
244 )
245 )
247 if "all" in values:
248 values.update(cls._add_w_all_cascades)
249 if "none" in values:
250 values.clear()
251 values.discard("all")
253 self = super().__new__(cls, values)
254 self.save_update = "save-update" in values
255 self.delete = "delete" in values
256 self.refresh_expire = "refresh-expire" in values
257 self.merge = "merge" in values
258 self.expunge = "expunge" in values
259 self.delete_orphan = "delete-orphan" in values
261 if self.delete_orphan and not self.delete:
262 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
263 return self
265 def __repr__(self):
266 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
268 @classmethod
269 def from_string(cls, arg):
270 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
271 return cls(values)
274def _validator_events(desc, key, validator, include_removes, include_backrefs):
275 """Runs a validation method on an attribute value to be set or
276 appended.
277 """
279 if not include_backrefs:
281 def detect_is_backref(state, initiator):
282 impl = state.manager[key].impl
283 return initiator.impl is not impl
285 if include_removes:
287 def append(state, value, initiator):
288 if initiator.op is not attributes.OP_BULK_REPLACE and (
289 include_backrefs or not detect_is_backref(state, initiator)
290 ):
291 return validator(state.obj(), key, value, False)
292 else:
293 return value
295 def bulk_set(state, values, initiator):
296 if include_backrefs or not detect_is_backref(state, initiator):
297 obj = state.obj()
298 values[:] = [
299 validator(obj, key, value, False) for value in values
300 ]
302 def set_(state, value, oldvalue, initiator):
303 if include_backrefs or not detect_is_backref(state, initiator):
304 return validator(state.obj(), key, value, False)
305 else:
306 return value
308 def remove(state, value, initiator):
309 if include_backrefs or not detect_is_backref(state, initiator):
310 validator(state.obj(), key, value, True)
312 else:
314 def append(state, value, initiator):
315 if initiator.op is not attributes.OP_BULK_REPLACE and (
316 include_backrefs or not detect_is_backref(state, initiator)
317 ):
318 return validator(state.obj(), key, value)
319 else:
320 return value
322 def bulk_set(state, values, initiator):
323 if include_backrefs or not detect_is_backref(state, initiator):
324 obj = state.obj()
325 values[:] = [validator(obj, key, value) for value in values]
327 def set_(state, value, oldvalue, initiator):
328 if include_backrefs or not detect_is_backref(state, initiator):
329 return validator(state.obj(), key, value)
330 else:
331 return value
333 event.listen(desc, "append", append, raw=True, retval=True)
334 event.listen(desc, "bulk_replace", bulk_set, raw=True)
335 event.listen(desc, "set", set_, raw=True, retval=True)
336 if include_removes:
337 event.listen(desc, "remove", remove, raw=True, retval=True)
340def polymorphic_union(
341 table_map, typecolname, aliasname="p_union", cast_nulls=True
342):
343 """Create a ``UNION`` statement used by a polymorphic mapper.
345 See :ref:`concrete_inheritance` for an example of how
346 this is used.
348 :param table_map: mapping of polymorphic identities to
349 :class:`_schema.Table` objects.
350 :param typecolname: string name of a "discriminator" column, which will be
351 derived from the query, producing the polymorphic identity for
352 each row. If ``None``, no polymorphic discriminator is generated.
353 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
354 construct generated.
355 :param cast_nulls: if True, non-existent columns, which are represented
356 as labeled NULLs, will be passed into CAST. This is a legacy behavior
357 that is problematic on some backends such as Oracle - in which case it
358 can be set to False.
360 """
362 colnames: util.OrderedSet[str] = util.OrderedSet()
363 colnamemaps = {}
364 types = {}
365 for key in table_map:
366 table = table_map[key]
368 table = coercions.expect(
369 roles.StrictFromClauseRole, table, allow_select=True
370 )
371 table_map[key] = table
373 m = {}
374 for c in table.c:
375 if c.key == typecolname:
376 raise sa_exc.InvalidRequestError(
377 "Polymorphic union can't use '%s' as the discriminator "
378 "column due to mapped column %r; please apply the "
379 "'typecolname' "
380 "argument; this is available on "
381 "ConcreteBase as '_concrete_discriminator_name'"
382 % (typecolname, c)
383 )
384 colnames.add(c.key)
385 m[c.key] = c
386 types[c.key] = c.type
387 colnamemaps[table] = m
389 def col(name, table):
390 try:
391 return colnamemaps[table][name]
392 except KeyError:
393 if cast_nulls:
394 return sql.cast(sql.null(), types[name]).label(name)
395 else:
396 return sql.type_coerce(sql.null(), types[name]).label(name)
398 result = []
399 for type_, table in table_map.items():
400 if typecolname is not None:
401 result.append(
402 sql.select(
403 *(
404 [col(name, table) for name in colnames]
405 + [
406 sql.literal_column(
407 sql_util._quote_ddl_expr(type_)
408 ).label(typecolname)
409 ]
410 )
411 ).select_from(table)
412 )
413 else:
414 result.append(
415 sql.select(
416 *[col(name, table) for name in colnames]
417 ).select_from(table)
418 )
419 return sql.union_all(*result).alias(aliasname)
422def identity_key(
423 class_: Optional[Type[_T]] = None,
424 ident: Union[Any, Tuple[Any, ...]] = None,
425 *,
426 instance: Optional[_T] = None,
427 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
428 identity_token: Optional[Any] = None,
429) -> _IdentityKeyType[_T]:
430 r"""Generate "identity key" tuples, as are used as keys in the
431 :attr:`.Session.identity_map` dictionary.
433 This function has several call styles:
435 * ``identity_key(class, ident, identity_token=token)``
437 This form receives a mapped class and a primary key scalar or
438 tuple as an argument.
440 E.g.::
442 >>> identity_key(MyClass, (1, 2))
443 (<class '__main__.MyClass'>, (1, 2), None)
445 :param class: mapped class (must be a positional argument)
446 :param ident: primary key, may be a scalar or tuple argument.
447 :param identity_token: optional identity token
449 .. versionadded:: 1.2 added identity_token
452 * ``identity_key(instance=instance)``
454 This form will produce the identity key for a given instance. The
455 instance need not be persistent, only that its primary key attributes
456 are populated (else the key will contain ``None`` for those missing
457 values).
459 E.g.::
461 >>> instance = MyClass(1, 2)
462 >>> identity_key(instance=instance)
463 (<class '__main__.MyClass'>, (1, 2), None)
465 In this form, the given instance is ultimately run though
466 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
467 effect of performing a database check for the corresponding row
468 if the object is expired.
470 :param instance: object instance (must be given as a keyword arg)
472 * ``identity_key(class, row=row, identity_token=token)``
474 This form is similar to the class/tuple form, except is passed a
475 database result row as a :class:`.Row` or :class:`.RowMapping` object.
477 E.g.::
479 >>> row = engine.execute(\
480 text("select * from table where a=1 and b=2")\
481 ).first()
482 >>> identity_key(MyClass, row=row)
483 (<class '__main__.MyClass'>, (1, 2), None)
485 :param class: mapped class (must be a positional argument)
486 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
487 (must be given as a keyword arg)
488 :param identity_token: optional identity token
490 .. versionadded:: 1.2 added identity_token
492 """
493 if class_ is not None:
494 mapper = class_mapper(class_)
495 if row is None:
496 if ident is None:
497 raise sa_exc.ArgumentError("ident or row is required")
498 return mapper.identity_key_from_primary_key(
499 tuple(util.to_list(ident)), identity_token=identity_token
500 )
501 else:
502 return mapper.identity_key_from_row(
503 row, identity_token=identity_token
504 )
505 elif instance is not None:
506 mapper = object_mapper(instance)
507 return mapper.identity_key_from_instance(instance)
508 else:
509 raise sa_exc.ArgumentError("class or instance is required")
512class _TraceAdaptRole(enum.Enum):
513 """Enumeration of all the use cases for ORMAdapter.
515 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
516 used for in-place adaption of column expressions to be applied to a SELECT,
517 replacing :class:`.Table` and other objects that are mapped to classes with
518 aliases of those tables in the case of joined eager loading, or in the case
519 of polymorphic loading as used with concrete mappings or other custom "with
520 polymorphic" parameters, with whole user-defined subqueries. The
521 enumerations provide an overview of all the use cases used by ORMAdapter, a
522 layer of formality as to the introduction of new ORMAdapter use cases (of
523 which none are anticipated), as well as a means to trace the origins of a
524 particular ORMAdapter within runtime debugging.
526 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
527 open-ended statement adaption, including the ``Query.with_polymorphic()``
528 method and the ``Query.select_from_entity()`` methods, favoring
529 user-explicit aliasing schemes using the ``aliased()`` and
530 ``with_polymorphic()`` standalone constructs; these still use adaption,
531 however the adaption is applied in a narrower scope.
533 """
535 # aliased() use that is used to adapt individual attributes at query
536 # construction time
537 ALIASED_INSP = enum.auto()
539 # joinedload cases; typically adapt an ON clause of a relationship
540 # join
541 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
542 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
543 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
545 # polymorphic cases - these are complex ones that replace FROM
546 # clauses, replacing tables with subqueries
547 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
548 WITH_POLYMORPHIC_ADAPTER = enum.auto()
549 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
550 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
552 # the from_statement() case, used only to adapt individual attributes
553 # from a given statement to local ORM attributes at result fetching
554 # time. assigned to ORMCompileState._from_obj_alias
555 ADAPT_FROM_STATEMENT = enum.auto()
557 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
558 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
559 # joinedloads are then placed on the outside.
560 # assigned to ORMCompileState.compound_eager_adapter
561 COMPOUND_EAGER_STATEMENT = enum.auto()
563 # the legacy Query._set_select_from() case.
564 # this is needed for Query's set operations (i.e. UNION, etc. )
565 # as well as "legacy from_self()", which while removed from 2.0 as
566 # public API, is used for the Query.count() method. this one
567 # still does full statement traversal
568 # assigned to ORMCompileState._from_obj_alias
569 LEGACY_SELECT_FROM_ALIAS = enum.auto()
572class ORMStatementAdapter(sql_util.ColumnAdapter):
573 """ColumnAdapter which includes a role attribute."""
575 __slots__ = ("role",)
577 def __init__(
578 self,
579 role: _TraceAdaptRole,
580 selectable: Selectable,
581 *,
582 equivalents: Optional[_EquivalentColumnMap] = None,
583 adapt_required: bool = False,
584 allow_label_resolve: bool = True,
585 anonymize_labels: bool = False,
586 adapt_on_names: bool = False,
587 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
588 ):
589 self.role = role
590 super().__init__(
591 selectable,
592 equivalents=equivalents,
593 adapt_required=adapt_required,
594 allow_label_resolve=allow_label_resolve,
595 anonymize_labels=anonymize_labels,
596 adapt_on_names=adapt_on_names,
597 adapt_from_selectables=adapt_from_selectables,
598 )
601class ORMAdapter(sql_util.ColumnAdapter):
602 """ColumnAdapter subclass which excludes adaptation of entities from
603 non-matching mappers.
605 """
607 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
609 is_aliased_class: bool
610 aliased_insp: Optional[AliasedInsp[Any]]
612 def __init__(
613 self,
614 role: _TraceAdaptRole,
615 entity: _InternalEntityType[Any],
616 *,
617 equivalents: Optional[_EquivalentColumnMap] = None,
618 adapt_required: bool = False,
619 allow_label_resolve: bool = True,
620 anonymize_labels: bool = False,
621 selectable: Optional[Selectable] = None,
622 limit_on_entity: bool = True,
623 adapt_on_names: bool = False,
624 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
625 ):
626 self.role = role
627 self.mapper = entity.mapper
628 if selectable is None:
629 selectable = entity.selectable
630 if insp_is_aliased_class(entity):
631 self.is_aliased_class = True
632 self.aliased_insp = entity
633 else:
634 self.is_aliased_class = False
635 self.aliased_insp = None
637 super().__init__(
638 selectable,
639 equivalents,
640 adapt_required=adapt_required,
641 allow_label_resolve=allow_label_resolve,
642 anonymize_labels=anonymize_labels,
643 include_fn=self._include_fn if limit_on_entity else None,
644 adapt_on_names=adapt_on_names,
645 adapt_from_selectables=adapt_from_selectables,
646 )
648 def _include_fn(self, elem):
649 entity = elem._annotations.get("parentmapper", None)
651 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
654class AliasedClass(
655 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
656):
657 r"""Represents an "aliased" form of a mapped class for usage with Query.
659 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
660 construct, this object mimics the mapped class using a
661 ``__getattr__`` scheme and maintains a reference to a
662 real :class:`~sqlalchemy.sql.expression.Alias` object.
664 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
665 within a SQL statement generated by the ORM, such that an existing
666 mapped entity can be used in multiple contexts. A simple example::
668 # find all pairs of users with the same name
669 user_alias = aliased(User)
670 session.query(User, user_alias).\
671 join((user_alias, User.id > user_alias.id)).\
672 filter(User.name == user_alias.name)
674 :class:`.AliasedClass` is also capable of mapping an existing mapped
675 class to an entirely new selectable, provided this selectable is column-
676 compatible with the existing mapped selectable, and it can also be
677 configured in a mapping as the target of a :func:`_orm.relationship`.
678 See the links below for examples.
680 The :class:`.AliasedClass` object is constructed typically using the
681 :func:`_orm.aliased` function. It also is produced with additional
682 configuration when using the :func:`_orm.with_polymorphic` function.
684 The resulting object is an instance of :class:`.AliasedClass`.
685 This object implements an attribute scheme which produces the
686 same attribute and method interface as the original mapped
687 class, allowing :class:`.AliasedClass` to be compatible
688 with any attribute technique which works on the original class,
689 including hybrid attributes (see :ref:`hybrids_toplevel`).
691 The :class:`.AliasedClass` can be inspected for its underlying
692 :class:`_orm.Mapper`, aliased selectable, and other information
693 using :func:`_sa.inspect`::
695 from sqlalchemy import inspect
696 my_alias = aliased(MyClass)
697 insp = inspect(my_alias)
699 The resulting inspection object is an instance of :class:`.AliasedInsp`.
702 .. seealso::
704 :func:`.aliased`
706 :func:`.with_polymorphic`
708 :ref:`relationship_aliased_class`
710 :ref:`relationship_to_window_function`
713 """
715 __name__: str
717 def __init__(
718 self,
719 mapped_class_or_ac: _EntityType[_O],
720 alias: Optional[FromClause] = None,
721 name: Optional[str] = None,
722 flat: bool = False,
723 adapt_on_names: bool = False,
724 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
725 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
726 base_alias: Optional[AliasedInsp[Any]] = None,
727 use_mapper_path: bool = False,
728 represents_outer_join: bool = False,
729 ):
730 insp = cast(
731 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
732 )
733 mapper = insp.mapper
735 nest_adapters = False
737 if alias is None:
738 if insp.is_aliased_class and insp.selectable._is_subquery:
739 alias = insp.selectable.alias()
740 else:
741 alias = (
742 mapper._with_polymorphic_selectable._anonymous_fromclause(
743 name=name,
744 flat=flat,
745 )
746 )
747 elif insp.is_aliased_class:
748 nest_adapters = True
750 assert alias is not None
751 self._aliased_insp = AliasedInsp(
752 self,
753 insp,
754 alias,
755 name,
756 (
757 with_polymorphic_mappers
758 if with_polymorphic_mappers
759 else mapper.with_polymorphic_mappers
760 ),
761 (
762 with_polymorphic_discriminator
763 if with_polymorphic_discriminator is not None
764 else mapper.polymorphic_on
765 ),
766 base_alias,
767 use_mapper_path,
768 adapt_on_names,
769 represents_outer_join,
770 nest_adapters,
771 )
773 self.__name__ = f"aliased({mapper.class_.__name__})"
775 @classmethod
776 def _reconstitute_from_aliased_insp(
777 cls, aliased_insp: AliasedInsp[_O]
778 ) -> AliasedClass[_O]:
779 obj = cls.__new__(cls)
780 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
781 obj._aliased_insp = aliased_insp
783 if aliased_insp._is_with_polymorphic:
784 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
785 if sub_aliased_insp is not aliased_insp:
786 ent = AliasedClass._reconstitute_from_aliased_insp(
787 sub_aliased_insp
788 )
789 setattr(obj, sub_aliased_insp.class_.__name__, ent)
791 return obj
793 def __getattr__(self, key: str) -> Any:
794 try:
795 _aliased_insp = self.__dict__["_aliased_insp"]
796 except KeyError:
797 raise AttributeError()
798 else:
799 target = _aliased_insp._target
800 # maintain all getattr mechanics
801 attr = getattr(target, key)
803 # attribute is a method, that will be invoked against a
804 # "self"; so just return a new method with the same function and
805 # new self
806 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
807 return types.MethodType(attr.__func__, self)
809 # attribute is a descriptor, that will be invoked against a
810 # "self"; so invoke the descriptor against this self
811 if hasattr(attr, "__get__"):
812 attr = attr.__get__(None, self)
814 # attributes within the QueryableAttribute system will want this
815 # to be invoked so the object can be adapted
816 if hasattr(attr, "adapt_to_entity"):
817 attr = attr.adapt_to_entity(_aliased_insp)
818 setattr(self, key, attr)
820 return attr
822 def _get_from_serialized(
823 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
824 ) -> Any:
825 # this method is only used in terms of the
826 # sqlalchemy.ext.serializer extension
827 attr = getattr(mapped_class, key)
828 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
829 return types.MethodType(attr.__func__, self)
831 # attribute is a descriptor, that will be invoked against a
832 # "self"; so invoke the descriptor against this self
833 if hasattr(attr, "__get__"):
834 attr = attr.__get__(None, self)
836 # attributes within the QueryableAttribute system will want this
837 # to be invoked so the object can be adapted
838 if hasattr(attr, "adapt_to_entity"):
839 aliased_insp._weak_entity = weakref.ref(self)
840 attr = attr.adapt_to_entity(aliased_insp)
841 setattr(self, key, attr)
843 return attr
845 def __repr__(self) -> str:
846 return "<AliasedClass at 0x%x; %s>" % (
847 id(self),
848 self._aliased_insp._target.__name__,
849 )
851 def __str__(self) -> str:
852 return str(self._aliased_insp)
855@inspection._self_inspects
856class AliasedInsp(
857 ORMEntityColumnsClauseRole[_O],
858 ORMFromClauseRole,
859 HasCacheKey,
860 InspectionAttr,
861 MemoizedSlots,
862 inspection.Inspectable["AliasedInsp[_O]"],
863 Generic[_O],
864):
865 """Provide an inspection interface for an
866 :class:`.AliasedClass` object.
868 The :class:`.AliasedInsp` object is returned
869 given an :class:`.AliasedClass` using the
870 :func:`_sa.inspect` function::
872 from sqlalchemy import inspect
873 from sqlalchemy.orm import aliased
875 my_alias = aliased(MyMappedClass)
876 insp = inspect(my_alias)
878 Attributes on :class:`.AliasedInsp`
879 include:
881 * ``entity`` - the :class:`.AliasedClass` represented.
882 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
883 * ``selectable`` - the :class:`_expression.Alias`
884 construct which ultimately
885 represents an aliased :class:`_schema.Table` or
886 :class:`_expression.Select`
887 construct.
888 * ``name`` - the name of the alias. Also is used as the attribute
889 name when returned in a result tuple from :class:`_query.Query`.
890 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
891 objects
892 indicating all those mappers expressed in the select construct
893 for the :class:`.AliasedClass`.
894 * ``polymorphic_on`` - an alternate column or SQL expression which
895 will be used as the "discriminator" for a polymorphic load.
897 .. seealso::
899 :ref:`inspection_toplevel`
901 """
903 __slots__ = (
904 "__weakref__",
905 "_weak_entity",
906 "mapper",
907 "selectable",
908 "name",
909 "_adapt_on_names",
910 "with_polymorphic_mappers",
911 "polymorphic_on",
912 "_use_mapper_path",
913 "_base_alias",
914 "represents_outer_join",
915 "persist_selectable",
916 "local_table",
917 "_is_with_polymorphic",
918 "_with_polymorphic_entities",
919 "_adapter",
920 "_target",
921 "__clause_element__",
922 "_memoized_values",
923 "_all_column_expressions",
924 "_nest_adapters",
925 )
927 _cache_key_traversal = [
928 ("name", visitors.ExtendedInternalTraversal.dp_string),
929 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
930 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
931 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
932 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
933 (
934 "with_polymorphic_mappers",
935 visitors.InternalTraversal.dp_has_cache_key_list,
936 ),
937 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
938 ]
940 mapper: Mapper[_O]
941 selectable: FromClause
942 _adapter: ORMAdapter
943 with_polymorphic_mappers: Sequence[Mapper[Any]]
944 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
946 _weak_entity: weakref.ref[AliasedClass[_O]]
947 """the AliasedClass that refers to this AliasedInsp"""
949 _target: Union[Type[_O], AliasedClass[_O]]
950 """the thing referenced by the AliasedClass/AliasedInsp.
952 In the vast majority of cases, this is the mapped class. However
953 it may also be another AliasedClass (alias of alias).
955 """
957 def __init__(
958 self,
959 entity: AliasedClass[_O],
960 inspected: _InternalEntityType[_O],
961 selectable: FromClause,
962 name: Optional[str],
963 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
964 polymorphic_on: Optional[ColumnElement[Any]],
965 _base_alias: Optional[AliasedInsp[Any]],
966 _use_mapper_path: bool,
967 adapt_on_names: bool,
968 represents_outer_join: bool,
969 nest_adapters: bool,
970 ):
971 mapped_class_or_ac = inspected.entity
972 mapper = inspected.mapper
974 self._weak_entity = weakref.ref(entity)
975 self.mapper = mapper
976 self.selectable = self.persist_selectable = self.local_table = (
977 selectable
978 )
979 self.name = name
980 self.polymorphic_on = polymorphic_on
981 self._base_alias = weakref.ref(_base_alias or self)
982 self._use_mapper_path = _use_mapper_path
983 self.represents_outer_join = represents_outer_join
984 self._nest_adapters = nest_adapters
986 if with_polymorphic_mappers:
987 self._is_with_polymorphic = True
988 self.with_polymorphic_mappers = with_polymorphic_mappers
989 self._with_polymorphic_entities = []
990 for poly in self.with_polymorphic_mappers:
991 if poly is not mapper:
992 ent = AliasedClass(
993 poly.class_,
994 selectable,
995 base_alias=self,
996 adapt_on_names=adapt_on_names,
997 use_mapper_path=_use_mapper_path,
998 )
1000 setattr(self.entity, poly.class_.__name__, ent)
1001 self._with_polymorphic_entities.append(ent._aliased_insp)
1003 else:
1004 self._is_with_polymorphic = False
1005 self.with_polymorphic_mappers = [mapper]
1007 self._adapter = ORMAdapter(
1008 _TraceAdaptRole.ALIASED_INSP,
1009 mapper,
1010 selectable=selectable,
1011 equivalents=mapper._equivalent_columns,
1012 adapt_on_names=adapt_on_names,
1013 anonymize_labels=True,
1014 # make sure the adapter doesn't try to grab other tables that
1015 # are not even the thing we are mapping, such as embedded
1016 # selectables in subqueries or CTEs. See issue #6060
1017 adapt_from_selectables={
1018 m.selectable
1019 for m in self.with_polymorphic_mappers
1020 if not adapt_on_names
1021 },
1022 limit_on_entity=False,
1023 )
1025 if nest_adapters:
1026 # supports "aliased class of aliased class" use case
1027 assert isinstance(inspected, AliasedInsp)
1028 self._adapter = inspected._adapter.wrap(self._adapter)
1030 self._adapt_on_names = adapt_on_names
1031 self._target = mapped_class_or_ac
1033 @classmethod
1034 def _alias_factory(
1035 cls,
1036 element: Union[_EntityType[_O], FromClause],
1037 alias: Optional[FromClause] = None,
1038 name: Optional[str] = None,
1039 flat: bool = False,
1040 adapt_on_names: bool = False,
1041 ) -> Union[AliasedClass[_O], FromClause]:
1042 if isinstance(element, FromClause):
1043 if adapt_on_names:
1044 raise sa_exc.ArgumentError(
1045 "adapt_on_names only applies to ORM elements"
1046 )
1047 if name:
1048 return element.alias(name=name, flat=flat)
1049 else:
1050 return coercions.expect(
1051 roles.AnonymizedFromClauseRole, element, flat=flat
1052 )
1053 else:
1054 return AliasedClass(
1055 element,
1056 alias=alias,
1057 flat=flat,
1058 name=name,
1059 adapt_on_names=adapt_on_names,
1060 )
1062 @classmethod
1063 def _with_polymorphic_factory(
1064 cls,
1065 base: Union[Type[_O], Mapper[_O]],
1066 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1067 selectable: Union[Literal[False, None], FromClause] = False,
1068 flat: bool = False,
1069 polymorphic_on: Optional[ColumnElement[Any]] = None,
1070 aliased: bool = False,
1071 innerjoin: bool = False,
1072 adapt_on_names: bool = False,
1073 name: Optional[str] = None,
1074 _use_mapper_path: bool = False,
1075 ) -> AliasedClass[_O]:
1076 primary_mapper = _class_to_mapper(base)
1078 if selectable not in (None, False) and flat:
1079 raise sa_exc.ArgumentError(
1080 "the 'flat' and 'selectable' arguments cannot be passed "
1081 "simultaneously to with_polymorphic()"
1082 )
1084 mappers, selectable = primary_mapper._with_polymorphic_args(
1085 classes, selectable, innerjoin=innerjoin
1086 )
1087 if aliased or flat:
1088 assert selectable is not None
1089 selectable = selectable._anonymous_fromclause(flat=flat)
1091 return AliasedClass(
1092 base,
1093 selectable,
1094 name=name,
1095 with_polymorphic_mappers=mappers,
1096 adapt_on_names=adapt_on_names,
1097 with_polymorphic_discriminator=polymorphic_on,
1098 use_mapper_path=_use_mapper_path,
1099 represents_outer_join=not innerjoin,
1100 )
1102 @property
1103 def entity(self) -> AliasedClass[_O]:
1104 # to eliminate reference cycles, the AliasedClass is held weakly.
1105 # this produces some situations where the AliasedClass gets lost,
1106 # particularly when one is created internally and only the AliasedInsp
1107 # is passed around.
1108 # to work around this case, we just generate a new one when we need
1109 # it, as it is a simple class with very little initial state on it.
1110 ent = self._weak_entity()
1111 if ent is None:
1112 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1113 self._weak_entity = weakref.ref(ent)
1114 return ent
1116 is_aliased_class = True
1117 "always returns True"
1119 def _memoized_method___clause_element__(self) -> FromClause:
1120 return self.selectable._annotate(
1121 {
1122 "parentmapper": self.mapper,
1123 "parententity": self,
1124 "entity_namespace": self,
1125 }
1126 )._set_propagate_attrs(
1127 {"compile_state_plugin": "orm", "plugin_subject": self}
1128 )
1130 @property
1131 def entity_namespace(self) -> AliasedClass[_O]:
1132 return self.entity
1134 @property
1135 def class_(self) -> Type[_O]:
1136 """Return the mapped class ultimately represented by this
1137 :class:`.AliasedInsp`."""
1138 return self.mapper.class_
1140 @property
1141 def _path_registry(self) -> AbstractEntityRegistry:
1142 if self._use_mapper_path:
1143 return self.mapper._path_registry
1144 else:
1145 return PathRegistry.per_mapper(self)
1147 def __getstate__(self) -> Dict[str, Any]:
1148 return {
1149 "entity": self.entity,
1150 "mapper": self.mapper,
1151 "alias": self.selectable,
1152 "name": self.name,
1153 "adapt_on_names": self._adapt_on_names,
1154 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1155 "with_polymorphic_discriminator": self.polymorphic_on,
1156 "base_alias": self._base_alias(),
1157 "use_mapper_path": self._use_mapper_path,
1158 "represents_outer_join": self.represents_outer_join,
1159 "nest_adapters": self._nest_adapters,
1160 }
1162 def __setstate__(self, state: Dict[str, Any]) -> None:
1163 self.__init__( # type: ignore
1164 state["entity"],
1165 state["mapper"],
1166 state["alias"],
1167 state["name"],
1168 state["with_polymorphic_mappers"],
1169 state["with_polymorphic_discriminator"],
1170 state["base_alias"],
1171 state["use_mapper_path"],
1172 state["adapt_on_names"],
1173 state["represents_outer_join"],
1174 state["nest_adapters"],
1175 )
1177 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1178 # assert self._is_with_polymorphic
1179 # assert other._is_with_polymorphic
1181 primary_mapper = other.mapper
1183 assert self.mapper is primary_mapper
1185 our_classes = util.to_set(
1186 mp.class_ for mp in self.with_polymorphic_mappers
1187 )
1188 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1189 if our_classes == new_classes:
1190 return other
1191 else:
1192 classes = our_classes.union(new_classes)
1194 mappers, selectable = primary_mapper._with_polymorphic_args(
1195 classes, None, innerjoin=not other.represents_outer_join
1196 )
1197 selectable = selectable._anonymous_fromclause(flat=True)
1198 return AliasedClass(
1199 primary_mapper,
1200 selectable,
1201 with_polymorphic_mappers=mappers,
1202 with_polymorphic_discriminator=other.polymorphic_on,
1203 use_mapper_path=other._use_mapper_path,
1204 represents_outer_join=other.represents_outer_join,
1205 )._aliased_insp
1207 def _adapt_element(
1208 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1209 ) -> _ORMCOLEXPR:
1210 assert isinstance(expr, ColumnElement)
1211 d: Dict[str, Any] = {
1212 "parententity": self,
1213 "parentmapper": self.mapper,
1214 }
1215 if key:
1216 d["proxy_key"] = key
1218 # IMO mypy should see this one also as returning the same type
1219 # we put into it, but it's not
1220 return (
1221 self._adapter.traverse(expr)
1222 ._annotate(d)
1223 ._set_propagate_attrs(
1224 {"compile_state_plugin": "orm", "plugin_subject": self}
1225 )
1226 )
1228 if TYPE_CHECKING:
1229 # establish compatibility with the _ORMAdapterProto protocol,
1230 # which in turn is compatible with _CoreAdapterProto.
1232 def _orm_adapt_element(
1233 self,
1234 obj: _CE,
1235 key: Optional[str] = None,
1236 ) -> _CE: ...
1238 else:
1239 _orm_adapt_element = _adapt_element
1241 def _entity_for_mapper(self, mapper):
1242 self_poly = self.with_polymorphic_mappers
1243 if mapper in self_poly:
1244 if mapper is self.mapper:
1245 return self
1246 else:
1247 return getattr(
1248 self.entity, mapper.class_.__name__
1249 )._aliased_insp
1250 elif mapper.isa(self.mapper):
1251 return self
1252 else:
1253 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1255 def _memoized_attr__get_clause(self):
1256 onclause, replacemap = self.mapper._get_clause
1257 return (
1258 self._adapter.traverse(onclause),
1259 {
1260 self._adapter.traverse(col): param
1261 for col, param in replacemap.items()
1262 },
1263 )
1265 def _memoized_attr__memoized_values(self):
1266 return {}
1268 def _memoized_attr__all_column_expressions(self):
1269 if self._is_with_polymorphic:
1270 cols_plus_keys = self.mapper._columns_plus_keys(
1271 [ent.mapper for ent in self._with_polymorphic_entities]
1272 )
1273 else:
1274 cols_plus_keys = self.mapper._columns_plus_keys()
1276 cols_plus_keys = [
1277 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1278 ]
1280 return ColumnCollection(cols_plus_keys)
1282 def _memo(self, key, callable_, *args, **kw):
1283 if key in self._memoized_values:
1284 return self._memoized_values[key]
1285 else:
1286 self._memoized_values[key] = value = callable_(*args, **kw)
1287 return value
1289 def __repr__(self):
1290 if self.with_polymorphic_mappers:
1291 with_poly = "(%s)" % ", ".join(
1292 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1293 )
1294 else:
1295 with_poly = ""
1296 return "<AliasedInsp at 0x%x; %s%s>" % (
1297 id(self),
1298 self.class_.__name__,
1299 with_poly,
1300 )
1302 def __str__(self):
1303 if self._is_with_polymorphic:
1304 return "with_polymorphic(%s, [%s])" % (
1305 self._target.__name__,
1306 ", ".join(
1307 mp.class_.__name__
1308 for mp in self.with_polymorphic_mappers
1309 if mp is not self.mapper
1310 ),
1311 )
1312 else:
1313 return "aliased(%s)" % (self._target.__name__,)
1316class _WrapUserEntity:
1317 """A wrapper used within the loader_criteria lambda caller so that
1318 we can bypass declared_attr descriptors on unmapped mixins, which
1319 normally emit a warning for such use.
1321 might also be useful for other per-lambda instrumentations should
1322 the need arise.
1324 """
1326 __slots__ = ("subject",)
1328 def __init__(self, subject):
1329 self.subject = subject
1331 @util.preload_module("sqlalchemy.orm.decl_api")
1332 def __getattribute__(self, name):
1333 decl_api = util.preloaded.orm.decl_api
1335 subject = object.__getattribute__(self, "subject")
1336 if name in subject.__dict__ and isinstance(
1337 subject.__dict__[name], decl_api.declared_attr
1338 ):
1339 return subject.__dict__[name].fget(subject)
1340 else:
1341 return getattr(subject, name)
1344class LoaderCriteriaOption(CriteriaOption):
1345 """Add additional WHERE criteria to the load for all occurrences of
1346 a particular entity.
1348 :class:`_orm.LoaderCriteriaOption` is invoked using the
1349 :func:`_orm.with_loader_criteria` function; see that function for
1350 details.
1352 .. versionadded:: 1.4
1354 """
1356 __slots__ = (
1357 "root_entity",
1358 "entity",
1359 "deferred_where_criteria",
1360 "where_criteria",
1361 "_where_crit_orig",
1362 "include_aliases",
1363 "propagate_to_loaders",
1364 )
1366 _traverse_internals = [
1367 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1368 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1369 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1370 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1371 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1372 ]
1374 root_entity: Optional[Type[Any]]
1375 entity: Optional[_InternalEntityType[Any]]
1376 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1377 deferred_where_criteria: bool
1378 include_aliases: bool
1379 propagate_to_loaders: bool
1381 _where_crit_orig: Any
1383 def __init__(
1384 self,
1385 entity_or_base: _EntityType[Any],
1386 where_criteria: Union[
1387 _ColumnExpressionArgument[bool],
1388 Callable[[Any], _ColumnExpressionArgument[bool]],
1389 ],
1390 loader_only: bool = False,
1391 include_aliases: bool = False,
1392 propagate_to_loaders: bool = True,
1393 track_closure_variables: bool = True,
1394 ):
1395 entity = cast(
1396 "_InternalEntityType[Any]",
1397 inspection.inspect(entity_or_base, False),
1398 )
1399 if entity is None:
1400 self.root_entity = cast("Type[Any]", entity_or_base)
1401 self.entity = None
1402 else:
1403 self.root_entity = None
1404 self.entity = entity
1406 self._where_crit_orig = where_criteria
1407 if callable(where_criteria):
1408 if self.root_entity is not None:
1409 wrap_entity = self.root_entity
1410 else:
1411 assert entity is not None
1412 wrap_entity = entity.entity
1414 self.deferred_where_criteria = True
1415 self.where_criteria = lambdas.DeferredLambdaElement(
1416 where_criteria,
1417 roles.WhereHavingRole,
1418 lambda_args=(_WrapUserEntity(wrap_entity),),
1419 opts=lambdas.LambdaOptions(
1420 track_closure_variables=track_closure_variables
1421 ),
1422 )
1423 else:
1424 self.deferred_where_criteria = False
1425 self.where_criteria = coercions.expect(
1426 roles.WhereHavingRole, where_criteria
1427 )
1429 self.include_aliases = include_aliases
1430 self.propagate_to_loaders = propagate_to_loaders
1432 @classmethod
1433 def _unreduce(
1434 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1435 ):
1436 return LoaderCriteriaOption(
1437 entity,
1438 where_criteria,
1439 include_aliases=include_aliases,
1440 propagate_to_loaders=propagate_to_loaders,
1441 )
1443 def __reduce__(self):
1444 return (
1445 LoaderCriteriaOption._unreduce,
1446 (
1447 self.entity.class_ if self.entity else self.root_entity,
1448 self._where_crit_orig,
1449 self.include_aliases,
1450 self.propagate_to_loaders,
1451 ),
1452 )
1454 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1455 if self.entity:
1456 yield from self.entity.mapper.self_and_descendants
1457 else:
1458 assert self.root_entity
1459 stack = list(self.root_entity.__subclasses__())
1460 while stack:
1461 subclass = stack.pop(0)
1462 ent = cast(
1463 "_InternalEntityType[Any]",
1464 inspection.inspect(subclass, raiseerr=False),
1465 )
1466 if ent:
1467 yield from ent.mapper.self_and_descendants
1468 else:
1469 stack.extend(subclass.__subclasses__())
1471 def _should_include(self, compile_state: ORMCompileState) -> bool:
1472 if (
1473 compile_state.select_statement._annotations.get(
1474 "for_loader_criteria", None
1475 )
1476 is self
1477 ):
1478 return False
1479 return True
1481 def _resolve_where_criteria(
1482 self, ext_info: _InternalEntityType[Any]
1483 ) -> ColumnElement[bool]:
1484 if self.deferred_where_criteria:
1485 crit = cast(
1486 "ColumnElement[bool]",
1487 self.where_criteria._resolve_with_args(ext_info.entity),
1488 )
1489 else:
1490 crit = self.where_criteria # type: ignore
1491 assert isinstance(crit, ColumnElement)
1492 return sql_util._deep_annotate(
1493 crit,
1494 {"for_loader_criteria": self},
1495 detect_subquery_cols=True,
1496 ind_cols_on_fromclause=True,
1497 )
1499 def process_compile_state_replaced_entities(
1500 self,
1501 compile_state: ORMCompileState,
1502 mapper_entities: Iterable[_MapperEntity],
1503 ) -> None:
1504 self.process_compile_state(compile_state)
1506 def process_compile_state(self, compile_state: ORMCompileState) -> None:
1507 """Apply a modification to a given :class:`.CompileState`."""
1509 # if options to limit the criteria to immediate query only,
1510 # use compile_state.attributes instead
1512 self.get_global_criteria(compile_state.global_attributes)
1514 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1515 for mp in self._all_mappers():
1516 load_criteria = attributes.setdefault(
1517 ("additional_entity_criteria", mp), []
1518 )
1520 load_criteria.append(self)
1523inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1526@inspection._inspects(type)
1527def _inspect_mc(
1528 class_: Type[_O],
1529) -> Optional[Mapper[_O]]:
1530 try:
1531 class_manager = opt_manager_of_class(class_)
1532 if class_manager is None or not class_manager.is_mapped:
1533 return None
1534 mapper = class_manager.mapper
1535 except exc.NO_STATE:
1536 return None
1537 else:
1538 return mapper
1541GenericAlias = type(List[Any])
1544@inspection._inspects(GenericAlias)
1545def _inspect_generic_alias(
1546 class_: Type[_O],
1547) -> Optional[Mapper[_O]]:
1548 origin = cast("Type[_O]", typing_get_origin(class_))
1549 return _inspect_mc(origin)
1552@inspection._self_inspects
1553class Bundle(
1554 ORMColumnsClauseRole[_T],
1555 SupportsCloneAnnotations,
1556 MemoizedHasCacheKey,
1557 inspection.Inspectable["Bundle[_T]"],
1558 InspectionAttr,
1559):
1560 """A grouping of SQL expressions that are returned by a :class:`.Query`
1561 under one namespace.
1563 The :class:`.Bundle` essentially allows nesting of the tuple-based
1564 results returned by a column-oriented :class:`_query.Query` object.
1565 It also
1566 is extensible via simple subclassing, where the primary capability
1567 to override is that of how the set of expressions should be returned,
1568 allowing post-processing as well as custom return types, without
1569 involving ORM identity-mapped classes.
1571 .. seealso::
1573 :ref:`bundles`
1576 """
1578 single_entity = False
1579 """If True, queries for a single Bundle will be returned as a single
1580 entity, rather than an element within a keyed tuple."""
1582 is_clause_element = False
1584 is_mapper = False
1586 is_aliased_class = False
1588 is_bundle = True
1590 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1592 proxy_set = util.EMPTY_SET # type: ignore
1594 exprs: List[_ColumnsClauseElement]
1596 def __init__(
1597 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1598 ):
1599 r"""Construct a new :class:`.Bundle`.
1601 e.g.::
1603 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1605 for row in session.query(bn).filter(
1606 bn.c.x == 5).filter(bn.c.y == 4):
1607 print(row.mybundle.x, row.mybundle.y)
1609 :param name: name of the bundle.
1610 :param \*exprs: columns or SQL expressions comprising the bundle.
1611 :param single_entity=False: if True, rows for this :class:`.Bundle`
1612 can be returned as a "single entity" outside of any enclosing tuple
1613 in the same manner as a mapped entity.
1615 """
1616 self.name = self._label = name
1617 coerced_exprs = [
1618 coercions.expect(
1619 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1620 )
1621 for expr in exprs
1622 ]
1623 self.exprs = coerced_exprs
1625 self.c = self.columns = ColumnCollection(
1626 (getattr(col, "key", col._label), col)
1627 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1628 ).as_readonly()
1629 self.single_entity = kw.pop("single_entity", self.single_entity)
1631 def _gen_cache_key(
1632 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1633 ) -> Tuple[Any, ...]:
1634 return (self.__class__, self.name, self.single_entity) + tuple(
1635 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1636 )
1638 @property
1639 def mapper(self) -> Optional[Mapper[Any]]:
1640 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1641 "parentmapper", None
1642 )
1643 return mp
1645 @property
1646 def entity(self) -> Optional[_InternalEntityType[Any]]:
1647 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1648 0
1649 ]._annotations.get("parententity", None)
1650 return ie
1652 @property
1653 def entity_namespace(
1654 self,
1655 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1656 return self.c
1658 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1660 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1662 e.g.::
1664 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1666 q = sess.query(bn).filter(bn.c.x == 5)
1668 Nesting of bundles is also supported::
1670 b1 = Bundle("b1",
1671 Bundle('b2', MyClass.a, MyClass.b),
1672 Bundle('b3', MyClass.x, MyClass.y)
1673 )
1675 q = sess.query(b1).filter(
1676 b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1678 .. seealso::
1680 :attr:`.Bundle.c`
1682 """
1684 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1685 """An alias for :attr:`.Bundle.columns`."""
1687 def _clone(self, **kw):
1688 cloned = self.__class__.__new__(self.__class__)
1689 cloned.__dict__.update(self.__dict__)
1690 return cloned
1692 def __clause_element__(self):
1693 # ensure existing entity_namespace remains
1694 annotations = {"bundle": self, "entity_namespace": self}
1695 annotations.update(self._annotations)
1697 plugin_subject = self.exprs[0]._propagate_attrs.get(
1698 "plugin_subject", self.entity
1699 )
1700 return (
1701 expression.ClauseList(
1702 _literal_as_text_role=roles.ColumnsClauseRole,
1703 group=False,
1704 *[e._annotations.get("bundle", e) for e in self.exprs],
1705 )
1706 ._annotate(annotations)
1707 ._set_propagate_attrs(
1708 # the Bundle *must* use the orm plugin no matter what. the
1709 # subject can be None but it's much better if it's not.
1710 {
1711 "compile_state_plugin": "orm",
1712 "plugin_subject": plugin_subject,
1713 }
1714 )
1715 )
1717 @property
1718 def clauses(self):
1719 return self.__clause_element__().clauses
1721 def label(self, name):
1722 """Provide a copy of this :class:`.Bundle` passing a new label."""
1724 cloned = self._clone()
1725 cloned.name = name
1726 return cloned
1728 def create_row_processor(
1729 self,
1730 query: Select[Unpack[TupleAny]],
1731 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1732 labels: Sequence[str],
1733 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1734 """Produce the "row processing" function for this :class:`.Bundle`.
1736 May be overridden by subclasses to provide custom behaviors when
1737 results are fetched. The method is passed the statement object and a
1738 set of "row processor" functions at query execution time; these
1739 processor functions when given a result row will return the individual
1740 attribute value, which can then be adapted into any kind of return data
1741 structure.
1743 The example below illustrates replacing the usual :class:`.Row`
1744 return structure with a straight Python dictionary::
1746 from sqlalchemy.orm import Bundle
1748 class DictBundle(Bundle):
1749 def create_row_processor(self, query, procs, labels):
1750 'Override create_row_processor to return values as
1751 dictionaries'
1753 def proc(row):
1754 return dict(
1755 zip(labels, (proc(row) for proc in procs))
1756 )
1757 return proc
1759 A result from the above :class:`_orm.Bundle` will return dictionary
1760 values::
1762 bn = DictBundle('mybundle', MyClass.data1, MyClass.data2)
1763 for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'):
1764 print(row.mybundle['data1'], row.mybundle['data2'])
1766 """
1767 keyed_tuple = result_tuple(labels, [() for l in labels])
1769 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1770 return keyed_tuple([proc(row) for proc in procs])
1772 return proc
1775def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1776 """Deep copy the given ClauseElement, annotating each element with the
1777 "_orm_adapt" flag.
1779 Elements within the exclude collection will be cloned but not annotated.
1781 """
1782 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1785def _orm_deannotate(element: _SA) -> _SA:
1786 """Remove annotations that link a column to a particular mapping.
1788 Note this doesn't affect "remote" and "foreign" annotations
1789 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1790 annotators.
1792 """
1794 return sql_util._deep_deannotate(
1795 element, values=("_orm_adapt", "parententity")
1796 )
1799def _orm_full_deannotate(element: _SA) -> _SA:
1800 return sql_util._deep_deannotate(element)
1803class _ORMJoin(expression.Join):
1804 """Extend Join to support ORM constructs as input."""
1806 __visit_name__ = expression.Join.__visit_name__
1808 inherit_cache = True
1810 def __init__(
1811 self,
1812 left: _FromClauseArgument,
1813 right: _FromClauseArgument,
1814 onclause: Optional[_OnClauseArgument] = None,
1815 isouter: bool = False,
1816 full: bool = False,
1817 _left_memo: Optional[Any] = None,
1818 _right_memo: Optional[Any] = None,
1819 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1820 ):
1821 left_info = cast(
1822 "Union[FromClause, _InternalEntityType[Any]]",
1823 inspection.inspect(left),
1824 )
1826 right_info = cast(
1827 "Union[FromClause, _InternalEntityType[Any]]",
1828 inspection.inspect(right),
1829 )
1830 adapt_to = right_info.selectable
1832 # used by joined eager loader
1833 self._left_memo = _left_memo
1834 self._right_memo = _right_memo
1836 if isinstance(onclause, attributes.QueryableAttribute):
1837 if TYPE_CHECKING:
1838 assert isinstance(
1839 onclause.comparator, RelationshipProperty.Comparator
1840 )
1841 on_selectable = onclause.comparator._source_selectable()
1842 prop = onclause.property
1843 _extra_criteria += onclause._extra_criteria
1844 elif isinstance(onclause, MapperProperty):
1845 # used internally by joined eager loader...possibly not ideal
1846 prop = onclause
1847 on_selectable = prop.parent.selectable
1848 else:
1849 prop = None
1850 on_selectable = None
1852 left_selectable = left_info.selectable
1853 if prop:
1854 adapt_from: Optional[FromClause]
1855 if sql_util.clause_is_present(on_selectable, left_selectable):
1856 adapt_from = on_selectable
1857 else:
1858 assert isinstance(left_selectable, FromClause)
1859 adapt_from = left_selectable
1861 (
1862 pj,
1863 sj,
1864 source,
1865 dest,
1866 secondary,
1867 target_adapter,
1868 ) = prop._create_joins(
1869 source_selectable=adapt_from,
1870 dest_selectable=adapt_to,
1871 source_polymorphic=True,
1872 of_type_entity=right_info,
1873 alias_secondary=True,
1874 extra_criteria=_extra_criteria,
1875 )
1877 if sj is not None:
1878 if isouter:
1879 # note this is an inner join from secondary->right
1880 right = sql.join(secondary, right, sj)
1881 onclause = pj
1882 else:
1883 left = sql.join(left, secondary, pj, isouter)
1884 onclause = sj
1885 else:
1886 onclause = pj
1888 self._target_adapter = target_adapter
1890 # we don't use the normal coercions logic for _ORMJoin
1891 # (probably should), so do some gymnastics to get the entity.
1892 # logic here is for #8721, which was a major bug in 1.4
1893 # for almost two years, not reported/fixed until 1.4.43 (!)
1894 if is_selectable(left_info):
1895 parententity = left_selectable._annotations.get(
1896 "parententity", None
1897 )
1898 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1899 parententity = left_info
1900 else:
1901 parententity = None
1903 if parententity is not None:
1904 self._annotations = self._annotations.union(
1905 {"parententity": parententity}
1906 )
1908 augment_onclause = bool(_extra_criteria) and not prop
1909 expression.Join.__init__(self, left, right, onclause, isouter, full)
1911 assert self.onclause is not None
1913 if augment_onclause:
1914 self.onclause &= sql.and_(*_extra_criteria)
1916 if (
1917 not prop
1918 and getattr(right_info, "mapper", None)
1919 and right_info.mapper.single # type: ignore
1920 ):
1921 right_info = cast("_InternalEntityType[Any]", right_info)
1922 # if single inheritance target and we are using a manual
1923 # or implicit ON clause, augment it the same way we'd augment the
1924 # WHERE.
1925 single_crit = right_info.mapper._single_table_criterion
1926 if single_crit is not None:
1927 if insp_is_aliased_class(right_info):
1928 single_crit = right_info._adapter.traverse(single_crit)
1929 self.onclause = self.onclause & single_crit
1931 def _splice_into_center(self, other):
1932 """Splice a join into the center.
1934 Given join(a, b) and join(b, c), return join(a, b).join(c)
1936 """
1937 leftmost = other
1938 while isinstance(leftmost, sql.Join):
1939 leftmost = leftmost.left
1941 assert self.right is leftmost
1943 left = _ORMJoin(
1944 self.left,
1945 other.left,
1946 self.onclause,
1947 isouter=self.isouter,
1948 _left_memo=self._left_memo,
1949 _right_memo=None,
1950 )
1952 return _ORMJoin(
1953 left,
1954 other.right,
1955 other.onclause,
1956 isouter=other.isouter,
1957 _right_memo=other._right_memo,
1958 )
1960 def join(
1961 self,
1962 right: _FromClauseArgument,
1963 onclause: Optional[_OnClauseArgument] = None,
1964 isouter: bool = False,
1965 full: bool = False,
1966 ) -> _ORMJoin:
1967 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1969 def outerjoin(
1970 self,
1971 right: _FromClauseArgument,
1972 onclause: Optional[_OnClauseArgument] = None,
1973 full: bool = False,
1974 ) -> _ORMJoin:
1975 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1978def with_parent(
1979 instance: object,
1980 prop: attributes.QueryableAttribute[Any],
1981 from_entity: Optional[_EntityType[Any]] = None,
1982) -> ColumnElement[bool]:
1983 """Create filtering criterion that relates this query's primary entity
1984 to the given related instance, using established
1985 :func:`_orm.relationship()`
1986 configuration.
1988 E.g.::
1990 stmt = select(Address).where(with_parent(some_user, User.addresses))
1993 The SQL rendered is the same as that rendered when a lazy loader
1994 would fire off from the given parent on that attribute, meaning
1995 that the appropriate state is taken from the parent object in
1996 Python without the need to render joins to the parent table
1997 in the rendered statement.
1999 The given property may also make use of :meth:`_orm.PropComparator.of_type`
2000 to indicate the left side of the criteria::
2003 a1 = aliased(Address)
2004 a2 = aliased(Address)
2005 stmt = select(a1, a2).where(
2006 with_parent(u1, User.addresses.of_type(a2))
2007 )
2009 The above use is equivalent to using the
2010 :func:`_orm.with_parent.from_entity` argument::
2012 a1 = aliased(Address)
2013 a2 = aliased(Address)
2014 stmt = select(a1, a2).where(
2015 with_parent(u1, User.addresses, from_entity=a2)
2016 )
2018 :param instance:
2019 An instance which has some :func:`_orm.relationship`.
2021 :param property:
2022 Class-bound attribute, which indicates
2023 what relationship from the instance should be used to reconcile the
2024 parent/child relationship.
2026 :param from_entity:
2027 Entity in which to consider as the left side. This defaults to the
2028 "zero" entity of the :class:`_query.Query` itself.
2030 .. versionadded:: 1.2
2032 """
2033 prop_t: RelationshipProperty[Any]
2035 if isinstance(prop, str):
2036 raise sa_exc.ArgumentError(
2037 "with_parent() accepts class-bound mapped attributes, not strings"
2038 )
2039 elif isinstance(prop, attributes.QueryableAttribute):
2040 if prop._of_type:
2041 from_entity = prop._of_type
2042 mapper_property = prop.property
2043 if mapper_property is None or not prop_is_relationship(
2044 mapper_property
2045 ):
2046 raise sa_exc.ArgumentError(
2047 f"Expected relationship property for with_parent(), "
2048 f"got {mapper_property}"
2049 )
2050 prop_t = mapper_property
2051 else:
2052 prop_t = prop
2054 return prop_t._with_parent(instance, from_entity=from_entity)
2057def has_identity(object_: object) -> bool:
2058 """Return True if the given object has a database
2059 identity.
2061 This typically corresponds to the object being
2062 in either the persistent or detached state.
2064 .. seealso::
2066 :func:`.was_deleted`
2068 """
2069 state = attributes.instance_state(object_)
2070 return state.has_identity
2073def was_deleted(object_: object) -> bool:
2074 """Return True if the given object was deleted
2075 within a session flush.
2077 This is regardless of whether or not the object is
2078 persistent or detached.
2080 .. seealso::
2082 :attr:`.InstanceState.was_deleted`
2084 """
2086 state = attributes.instance_state(object_)
2087 return state.was_deleted
2090def _entity_corresponds_to(
2091 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2092) -> bool:
2093 """determine if 'given' corresponds to 'entity', in terms
2094 of an entity passed to Query that would match the same entity
2095 being referred to elsewhere in the query.
2097 """
2098 if insp_is_aliased_class(entity):
2099 if insp_is_aliased_class(given):
2100 if entity._base_alias() is given._base_alias():
2101 return True
2102 return False
2103 elif insp_is_aliased_class(given):
2104 if given._use_mapper_path:
2105 return entity in given.with_polymorphic_mappers
2106 else:
2107 return entity is given
2109 assert insp_is_mapper(given)
2110 return entity.common_parent(given)
2113def _entity_corresponds_to_use_path_impl(
2114 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2115) -> bool:
2116 """determine if 'given' corresponds to 'entity', in terms
2117 of a path of loader options where a mapped attribute is taken to
2118 be a member of a parent entity.
2120 e.g.::
2122 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2123 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2125 a1 = aliased(A)
2126 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2127 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2129 wp = with_polymorphic(A, [A1, A2])
2130 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2131 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2134 """
2135 if insp_is_aliased_class(given):
2136 return (
2137 insp_is_aliased_class(entity)
2138 and not entity._use_mapper_path
2139 and (given is entity or entity in given._with_polymorphic_entities)
2140 )
2141 elif not insp_is_aliased_class(entity):
2142 return given.isa(entity.mapper)
2143 else:
2144 return (
2145 entity._use_mapper_path
2146 and given in entity.with_polymorphic_mappers
2147 )
2150def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2151 """determine if 'given' "is a" mapper, in terms of the given
2152 would load rows of type 'mapper'.
2154 """
2155 if given.is_aliased_class:
2156 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2157 mapper
2158 )
2159 elif given.with_polymorphic_mappers:
2160 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2161 else:
2162 return given.isa(mapper)
2165def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2166 """calculate __getitem__ in terms of an iterable query object
2167 that also has a slice() method.
2169 """
2171 def _no_negative_indexes():
2172 raise IndexError(
2173 "negative indexes are not accepted by SQL "
2174 "index / slice operators"
2175 )
2177 if isinstance(item, slice):
2178 start, stop, step = util.decode_slice(item)
2180 if (
2181 isinstance(stop, int)
2182 and isinstance(start, int)
2183 and stop - start <= 0
2184 ):
2185 return []
2187 elif (isinstance(start, int) and start < 0) or (
2188 isinstance(stop, int) and stop < 0
2189 ):
2190 _no_negative_indexes()
2192 res = iterable_query.slice(start, stop)
2193 if step is not None:
2194 return list(res)[None : None : item.step]
2195 else:
2196 return list(res)
2197 else:
2198 if item == -1:
2199 _no_negative_indexes()
2200 else:
2201 return list(iterable_query[item : item + 1])[0]
2204def _is_mapped_annotation(
2205 raw_annotation: _AnnotationScanType,
2206 cls: Type[Any],
2207 originating_cls: Type[Any],
2208) -> bool:
2209 try:
2210 annotated = de_stringify_annotation(
2211 cls, raw_annotation, originating_cls.__module__
2212 )
2213 except NameError:
2214 # in most cases, at least within our own tests, we can raise
2215 # here, which is more accurate as it prevents us from returning
2216 # false negatives. However, in the real world, try to avoid getting
2217 # involved with end-user annotations that have nothing to do with us.
2218 # see issue #8888 where we bypass using this function in the case
2219 # that we want to detect an unresolvable Mapped[] type.
2220 return False
2221 else:
2222 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2225class _CleanupError(Exception):
2226 pass
2229def _cleanup_mapped_str_annotation(
2230 annotation: str, originating_module: str
2231) -> str:
2232 # fix up an annotation that comes in as the form:
2233 # 'Mapped[List[Address]]' so that it instead looks like:
2234 # 'Mapped[List["Address"]]' , which will allow us to get
2235 # "Address" as a string
2237 # additionally, resolve symbols for these names since this is where
2238 # we'd have to do it
2240 inner: Optional[Match[str]]
2242 mm = re.match(r"^(.+?)\[(.+)\]$", annotation)
2244 if not mm:
2245 return annotation
2247 # ticket #8759. Resolve the Mapped name to a real symbol.
2248 # originally this just checked the name.
2249 try:
2250 obj = eval_name_only(mm.group(1), originating_module)
2251 except NameError as ne:
2252 raise _CleanupError(
2253 f'For annotation "{annotation}", could not resolve '
2254 f'container type "{mm.group(1)}". '
2255 "Please ensure this type is imported at the module level "
2256 "outside of TYPE_CHECKING blocks"
2257 ) from ne
2259 if obj is typing.ClassVar:
2260 real_symbol = "ClassVar"
2261 else:
2262 try:
2263 if issubclass(obj, _MappedAnnotationBase):
2264 real_symbol = obj.__name__
2265 else:
2266 return annotation
2267 except TypeError:
2268 # avoid isinstance(obj, type) check, just catch TypeError
2269 return annotation
2271 # note: if one of the codepaths above didn't define real_symbol and
2272 # then didn't return, real_symbol raises UnboundLocalError
2273 # which is actually a NameError, and the calling routines don't
2274 # notice this since they are catching NameError anyway. Just in case
2275 # this is being modified in the future, something to be aware of.
2277 stack = []
2278 inner = mm
2279 while True:
2280 stack.append(real_symbol if mm is inner else inner.group(1))
2281 g2 = inner.group(2)
2282 inner = re.match(r"^(.+?)\[(.+)\]$", g2)
2283 if inner is None:
2284 stack.append(g2)
2285 break
2287 # stacks we want to rewrite, that is, quote the last entry which
2288 # we think is a relationship class name:
2289 #
2290 # ['Mapped', 'List', 'Address']
2291 # ['Mapped', 'A']
2292 #
2293 # stacks we dont want to rewrite, which are generally MappedColumn
2294 # use cases:
2295 #
2296 # ['Mapped', "'Optional[Dict[str, str]]'"]
2297 # ['Mapped', 'dict[str, str] | None']
2299 if (
2300 # avoid already quoted symbols such as
2301 # ['Mapped', "'Optional[Dict[str, str]]'"]
2302 not re.match(r"""^["'].*["']$""", stack[-1])
2303 # avoid further generics like Dict[] such as
2304 # ['Mapped', 'dict[str, str] | None']
2305 and not re.match(r".*\[.*\]", stack[-1])
2306 ):
2307 stripchars = "\"' "
2308 stack[-1] = ", ".join(
2309 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2310 )
2312 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2314 return annotation
2317def _extract_mapped_subtype(
2318 raw_annotation: Optional[_AnnotationScanType],
2319 cls: type,
2320 originating_module: str,
2321 key: str,
2322 attr_cls: Type[Any],
2323 required: bool,
2324 is_dataclass_field: bool,
2325 expect_mapped: bool = True,
2326 raiseerr: bool = True,
2327) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2328 """given an annotation, figure out if it's ``Mapped[something]`` and if
2329 so, return the ``something`` part.
2331 Includes error raise scenarios and other options.
2333 """
2335 if raw_annotation is None:
2336 if required:
2337 raise sa_exc.ArgumentError(
2338 f"Python typing annotation is required for attribute "
2339 f'"{cls.__name__}.{key}" when primary argument(s) for '
2340 f'"{attr_cls.__name__}" construct are None or not present'
2341 )
2342 return None
2344 try:
2345 annotated = de_stringify_annotation(
2346 cls,
2347 raw_annotation,
2348 originating_module,
2349 str_cleanup_fn=_cleanup_mapped_str_annotation,
2350 )
2351 except _CleanupError as ce:
2352 raise sa_exc.ArgumentError(
2353 f"Could not interpret annotation {raw_annotation}. "
2354 "Check that it uses names that are correctly imported at the "
2355 "module level. See chained stack trace for more hints."
2356 ) from ce
2357 except NameError as ne:
2358 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2359 raise sa_exc.ArgumentError(
2360 f"Could not interpret annotation {raw_annotation}. "
2361 "Check that it uses names that are correctly imported at the "
2362 "module level. See chained stack trace for more hints."
2363 ) from ne
2365 annotated = raw_annotation # type: ignore
2367 if is_dataclass_field:
2368 return annotated, None
2369 else:
2370 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2371 annotated, _MappedAnnotationBase
2372 ):
2373 if expect_mapped:
2374 if not raiseerr:
2375 return None
2377 origin = getattr(annotated, "__origin__", None)
2378 if origin is typing.ClassVar:
2379 return None
2381 # check for other kind of ORM descriptor like AssociationProxy,
2382 # don't raise for that (issue #9957)
2383 elif isinstance(origin, type) and issubclass(
2384 origin, ORMDescriptor
2385 ):
2386 return None
2388 raise sa_exc.ArgumentError(
2389 f'Type annotation for "{cls.__name__}.{key}" '
2390 "can't be correctly interpreted for "
2391 "Annotated Declarative Table form. ORM annotations "
2392 "should normally make use of the ``Mapped[]`` generic "
2393 "type, or other ORM-compatible generic type, as a "
2394 "container for the actual type, which indicates the "
2395 "intent that the attribute is mapped. "
2396 "Class variables that are not intended to be mapped "
2397 "by the ORM should use ClassVar[]. "
2398 "To allow Annotated Declarative to disregard legacy "
2399 "annotations which don't use Mapped[] to pass, set "
2400 '"__allow_unmapped__ = True" on the class or a '
2401 "superclass this class.",
2402 code="zlpr",
2403 )
2405 else:
2406 return annotated, None
2408 if len(annotated.__args__) != 1:
2409 raise sa_exc.ArgumentError(
2410 "Expected sub-type for Mapped[] annotation"
2411 )
2413 return (
2414 # fix dict/list/set args to be ForwardRef, see #11814
2415 fixup_container_fwd_refs(annotated.__args__[0]),
2416 annotated.__origin__,
2417 )
2420def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2421 if hasattr(prop, "_mapper_property_name"):
2422 name = prop._mapper_property_name()
2423 else:
2424 name = None
2425 return util.clsname_as_plain_name(prop, name)