1# orm/util.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import get_origin
24from typing import Iterable
25from typing import Iterator
26from typing import List
27from typing import Literal
28from typing import Match
29from typing import Optional
30from typing import Protocol
31from typing import Sequence
32from typing import Tuple
33from typing import Type
34from typing import TYPE_CHECKING
35from typing import TypeVar
36from typing import Union
37import weakref
38
39from . import attributes # noqa
40from . import exc
41from . import exc as orm_exc
42from ._typing import _O
43from ._typing import insp_is_aliased_class
44from ._typing import insp_is_mapper
45from ._typing import prop_is_relationship
46from .base import _class_to_mapper as _class_to_mapper
47from .base import _MappedAnnotationBase
48from .base import _never_set as _never_set # noqa: F401
49from .base import _none_only_set as _none_only_set # noqa: F401
50from .base import _none_set as _none_set # noqa: F401
51from .base import attribute_str as attribute_str # noqa: F401
52from .base import class_mapper as class_mapper
53from .base import DynamicMapped
54from .base import InspectionAttr as InspectionAttr
55from .base import instance_str as instance_str # noqa: F401
56from .base import Mapped
57from .base import object_mapper as object_mapper
58from .base import object_state as object_state # noqa: F401
59from .base import opt_manager_of_class
60from .base import ORMDescriptor
61from .base import state_attribute_str as state_attribute_str # noqa: F401
62from .base import state_class_str as state_class_str # noqa: F401
63from .base import state_str as state_str # noqa: F401
64from .base import WriteOnlyMapped
65from .interfaces import CriteriaOption
66from .interfaces import MapperProperty as MapperProperty
67from .interfaces import ORMColumnsClauseRole
68from .interfaces import ORMEntityColumnsClauseRole
69from .interfaces import ORMFromClauseRole
70from .path_registry import PathRegistry as PathRegistry
71from .. import event
72from .. import exc as sa_exc
73from .. import inspection
74from .. import sql
75from .. import util
76from ..engine.result import result_tuple
77from ..sql import coercions
78from ..sql import expression
79from ..sql import lambdas
80from ..sql import roles
81from ..sql import util as sql_util
82from ..sql import visitors
83from ..sql._typing import is_selectable
84from ..sql.annotation import SupportsCloneAnnotations
85from ..sql.base import ColumnCollection
86from ..sql.cache_key import HasCacheKey
87from ..sql.cache_key import MemoizedHasCacheKey
88from ..sql.elements import ColumnElement
89from ..sql.elements import KeyedColumnElement
90from ..sql.selectable import FromClause
91from ..util.langhelpers import MemoizedSlots
92from ..util.typing import de_stringify_annotation as _de_stringify_annotation
93from ..util.typing import eval_name_only as _eval_name_only
94from ..util.typing import fixup_container_fwd_refs
95from ..util.typing import GenericProtocol
96from ..util.typing import is_origin_of_cls
97from ..util.typing import TupleAny
98from ..util.typing import Unpack
99
100if typing.TYPE_CHECKING:
101 from ._typing import _EntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InternalEntityType
104 from ._typing import _ORMCOLEXPR
105 from .context import _MapperEntity
106 from .context import _ORMCompileState
107 from .mapper import Mapper
108 from .path_registry import _AbstractEntityRegistry
109 from .query import Query
110 from .relationships import RelationshipProperty
111 from ..engine import Row
112 from ..engine import RowMapping
113 from ..sql._typing import _CE
114 from ..sql._typing import _ColumnExpressionArgument
115 from ..sql._typing import _EquivalentColumnMap
116 from ..sql._typing import _FromClauseArgument
117 from ..sql._typing import _OnClauseArgument
118 from ..sql._typing import _PropagateAttrsType
119 from ..sql.annotation import _SA
120 from ..sql.base import ReadOnlyColumnCollection
121 from ..sql.elements import BindParameter
122 from ..sql.selectable import _ColumnsClauseElement
123 from ..sql.selectable import Select
124 from ..sql.selectable import Selectable
125 from ..sql.visitors import anon_map
126 from ..util.typing import _AnnotationScanType
127 from ..util.typing import _MatchedOnType
128
129_T = TypeVar("_T", bound=Any)
130
131all_cascades = frozenset(
132 (
133 "delete",
134 "delete-orphan",
135 "all",
136 "merge",
137 "expunge",
138 "save-update",
139 "refresh-expire",
140 "none",
141 )
142)
143
144_de_stringify_partial = functools.partial(
145 functools.partial,
146 locals_=util.immutabledict(
147 {
148 "Mapped": Mapped,
149 "WriteOnlyMapped": WriteOnlyMapped,
150 "DynamicMapped": DynamicMapped,
151 }
152 ),
153)
154
155# partial is practically useless as we have to write out the whole
156# function and maintain the signature anyway
157
158
159class _DeStringifyAnnotation(Protocol):
160 def __call__(
161 self,
162 cls: Type[Any],
163 annotation: _AnnotationScanType,
164 originating_module: str,
165 *,
166 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
167 include_generic: bool = False,
168 ) -> _MatchedOnType: ...
169
170
171de_stringify_annotation = cast(
172 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
173)
174
175
176class _EvalNameOnly(Protocol):
177 def __call__(self, name: str, module_name: str) -> Any: ...
178
179
180eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
181
182
183class CascadeOptions(FrozenSet[str]):
184 """Keeps track of the options sent to
185 :paramref:`.relationship.cascade`"""
186
187 _add_w_all_cascades = all_cascades.difference(
188 ["all", "none", "delete-orphan"]
189 )
190 _allowed_cascades = all_cascades
191
192 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
193
194 __slots__ = (
195 "save_update",
196 "delete",
197 "refresh_expire",
198 "merge",
199 "expunge",
200 "delete_orphan",
201 )
202
203 save_update: bool
204 delete: bool
205 refresh_expire: bool
206 merge: bool
207 expunge: bool
208 delete_orphan: bool
209
210 def __new__(
211 cls, value_list: Optional[Union[Iterable[str], str]]
212 ) -> CascadeOptions:
213 if isinstance(value_list, str) or value_list is None:
214 return cls.from_string(value_list) # type: ignore
215 values = set(value_list)
216 if values.difference(cls._allowed_cascades):
217 raise sa_exc.ArgumentError(
218 "Invalid cascade option(s): %s"
219 % ", ".join(
220 [
221 repr(x)
222 for x in sorted(
223 values.difference(cls._allowed_cascades)
224 )
225 ]
226 )
227 )
228
229 if "all" in values:
230 values.update(cls._add_w_all_cascades)
231 if "none" in values:
232 values.clear()
233 values.discard("all")
234
235 self = super().__new__(cls, values)
236 self.save_update = "save-update" in values
237 self.delete = "delete" in values
238 self.refresh_expire = "refresh-expire" in values
239 self.merge = "merge" in values
240 self.expunge = "expunge" in values
241 self.delete_orphan = "delete-orphan" in values
242
243 if self.delete_orphan and not self.delete:
244 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
245 return self
246
247 def __repr__(self):
248 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
249
250 @classmethod
251 def from_string(cls, arg):
252 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
253 return cls(values)
254
255
256def _validator_events(desc, key, validator, include_removes, include_backrefs):
257 """Runs a validation method on an attribute value to be set or
258 appended.
259 """
260
261 if not include_backrefs:
262
263 def detect_is_backref(state, initiator):
264 impl = state.manager[key].impl
265 return initiator.impl is not impl
266
267 if include_removes:
268
269 def append(state, value, initiator):
270 if initiator.op is not attributes.OP_BULK_REPLACE and (
271 include_backrefs or not detect_is_backref(state, initiator)
272 ):
273 return validator(state.obj(), key, value, False)
274 else:
275 return value
276
277 def bulk_set(state, values, initiator):
278 if include_backrefs or not detect_is_backref(state, initiator):
279 obj = state.obj()
280 values[:] = [
281 validator(obj, key, value, False) for value in values
282 ]
283
284 def set_(state, value, oldvalue, initiator):
285 if include_backrefs or not detect_is_backref(state, initiator):
286 return validator(state.obj(), key, value, False)
287 else:
288 return value
289
290 def remove(state, value, initiator):
291 if include_backrefs or not detect_is_backref(state, initiator):
292 validator(state.obj(), key, value, True)
293
294 else:
295
296 def append(state, value, initiator):
297 if initiator.op is not attributes.OP_BULK_REPLACE and (
298 include_backrefs or not detect_is_backref(state, initiator)
299 ):
300 return validator(state.obj(), key, value)
301 else:
302 return value
303
304 def bulk_set(state, values, initiator):
305 if include_backrefs or not detect_is_backref(state, initiator):
306 obj = state.obj()
307 values[:] = [validator(obj, key, value) for value in values]
308
309 def set_(state, value, oldvalue, initiator):
310 if include_backrefs or not detect_is_backref(state, initiator):
311 return validator(state.obj(), key, value)
312 else:
313 return value
314
315 event.listen(desc, "append", append, raw=True, retval=True)
316 event.listen(desc, "bulk_replace", bulk_set, raw=True)
317 event.listen(desc, "set", set_, raw=True, retval=True)
318 if include_removes:
319 event.listen(desc, "remove", remove, raw=True, retval=True)
320
321
322def polymorphic_union(
323 table_map, typecolname, aliasname="p_union", cast_nulls=True
324):
325 """Create a ``UNION`` statement used by a polymorphic mapper.
326
327 See :ref:`concrete_inheritance` for an example of how
328 this is used.
329
330 :param table_map: mapping of polymorphic identities to
331 :class:`_schema.Table` objects.
332 :param typecolname: string name of a "discriminator" column, which will be
333 derived from the query, producing the polymorphic identity for
334 each row. If ``None``, no polymorphic discriminator is generated.
335 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
336 construct generated.
337 :param cast_nulls: if True, non-existent columns, which are represented
338 as labeled NULLs, will be passed into CAST. This is a legacy behavior
339 that is problematic on some backends such as Oracle - in which case it
340 can be set to False.
341
342 """
343
344 colnames: util.OrderedSet[str] = util.OrderedSet()
345 colnamemaps = {}
346 types = {}
347 for key in table_map:
348 table = table_map[key]
349
350 table = coercions.expect(roles.FromClauseRole, table)
351 table_map[key] = table
352
353 m = {}
354 for c in table.c:
355 if c.key == typecolname:
356 raise sa_exc.InvalidRequestError(
357 "Polymorphic union can't use '%s' as the discriminator "
358 "column due to mapped column %r; please apply the "
359 "'typecolname' "
360 "argument; this is available on "
361 "ConcreteBase as '_concrete_discriminator_name'"
362 % (typecolname, c)
363 )
364 colnames.add(c.key)
365 m[c.key] = c
366 types[c.key] = c.type
367 colnamemaps[table] = m
368
369 def col(name, table):
370 try:
371 return colnamemaps[table][name]
372 except KeyError:
373 if cast_nulls:
374 return sql.cast(sql.null(), types[name]).label(name)
375 else:
376 return sql.type_coerce(sql.null(), types[name]).label(name)
377
378 result = []
379 for type_, table in table_map.items():
380 if typecolname is not None:
381 result.append(
382 sql.select(
383 *(
384 [col(name, table) for name in colnames]
385 + [
386 sql.literal_column(
387 sql_util._quote_ddl_expr(type_)
388 ).label(typecolname)
389 ]
390 )
391 ).select_from(table)
392 )
393 else:
394 result.append(
395 sql.select(
396 *[col(name, table) for name in colnames]
397 ).select_from(table)
398 )
399 return sql.union_all(*result).alias(aliasname)
400
401
402def identity_key(
403 class_: Optional[Type[_T]] = None,
404 ident: Union[Any, Tuple[Any, ...]] = None,
405 *,
406 instance: Optional[_T] = None,
407 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
408 identity_token: Optional[Any] = None,
409) -> _IdentityKeyType[_T]:
410 r"""Generate "identity key" tuples, as are used as keys in the
411 :attr:`.Session.identity_map` dictionary.
412
413 This function has several call styles:
414
415 * ``identity_key(class, ident, identity_token=token)``
416
417 This form receives a mapped class and a primary key scalar or
418 tuple as an argument.
419
420 E.g.::
421
422 >>> identity_key(MyClass, (1, 2))
423 (<class '__main__.MyClass'>, (1, 2), None)
424
425 :param class: mapped class (must be a positional argument)
426 :param ident: primary key, may be a scalar or tuple argument.
427 :param identity_token: optional identity token
428
429 * ``identity_key(instance=instance)``
430
431 This form will produce the identity key for a given instance. The
432 instance need not be persistent, only that its primary key attributes
433 are populated (else the key will contain ``None`` for those missing
434 values).
435
436 E.g.::
437
438 >>> instance = MyClass(1, 2)
439 >>> identity_key(instance=instance)
440 (<class '__main__.MyClass'>, (1, 2), None)
441
442 In this form, the given instance is ultimately run though
443 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
444 effect of performing a database check for the corresponding row
445 if the object is expired.
446
447 :param instance: object instance (must be given as a keyword arg)
448
449 * ``identity_key(class, row=row, identity_token=token)``
450
451 This form is similar to the class/tuple form, except is passed a
452 database result row as a :class:`.Row` or :class:`.RowMapping` object.
453
454 E.g.::
455
456 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
457 >>> identity_key(MyClass, row=row)
458 (<class '__main__.MyClass'>, (1, 2), None)
459
460 :param class: mapped class (must be a positional argument)
461 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
462 (must be given as a keyword arg)
463 :param identity_token: optional identity token
464
465 """ # noqa: E501
466 if class_ is not None:
467 mapper = class_mapper(class_)
468 if row is None:
469 if ident is None:
470 raise sa_exc.ArgumentError("ident or row is required")
471 return mapper.identity_key_from_primary_key(
472 tuple(util.to_list(ident)), identity_token=identity_token
473 )
474 else:
475 return mapper.identity_key_from_row(
476 row, identity_token=identity_token
477 )
478 elif instance is not None:
479 mapper = object_mapper(instance)
480 return mapper.identity_key_from_instance(instance)
481 else:
482 raise sa_exc.ArgumentError("class or instance is required")
483
484
485class _TraceAdaptRole(enum.Enum):
486 """Enumeration of all the use cases for ORMAdapter.
487
488 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
489 used for in-place adaption of column expressions to be applied to a SELECT,
490 replacing :class:`.Table` and other objects that are mapped to classes with
491 aliases of those tables in the case of joined eager loading, or in the case
492 of polymorphic loading as used with concrete mappings or other custom "with
493 polymorphic" parameters, with whole user-defined subqueries. The
494 enumerations provide an overview of all the use cases used by ORMAdapter, a
495 layer of formality as to the introduction of new ORMAdapter use cases (of
496 which none are anticipated), as well as a means to trace the origins of a
497 particular ORMAdapter within runtime debugging.
498
499 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
500 open-ended statement adaption, including the ``Query.with_polymorphic()``
501 method and the ``Query.select_from_entity()`` methods, favoring
502 user-explicit aliasing schemes using the ``aliased()`` and
503 ``with_polymorphic()`` standalone constructs; these still use adaption,
504 however the adaption is applied in a narrower scope.
505
506 """
507
508 # aliased() use that is used to adapt individual attributes at query
509 # construction time
510 ALIASED_INSP = enum.auto()
511
512 # joinedload cases; typically adapt an ON clause of a relationship
513 # join
514 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
515 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
516 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
517
518 # polymorphic cases - these are complex ones that replace FROM
519 # clauses, replacing tables with subqueries
520 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
521 WITH_POLYMORPHIC_ADAPTER = enum.auto()
522 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
523 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
524
525 # the from_statement() case, used only to adapt individual attributes
526 # from a given statement to local ORM attributes at result fetching
527 # time. assigned to ORMCompileState._from_obj_alias
528 ADAPT_FROM_STATEMENT = enum.auto()
529
530 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
531 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
532 # joinedloads are then placed on the outside.
533 # assigned to ORMCompileState.compound_eager_adapter
534 COMPOUND_EAGER_STATEMENT = enum.auto()
535
536 # the legacy Query._set_select_from() case.
537 # this is needed for Query's set operations (i.e. UNION, etc. )
538 # as well as "legacy from_self()", which while removed from 2.0 as
539 # public API, is used for the Query.count() method. this one
540 # still does full statement traversal
541 # assigned to ORMCompileState._from_obj_alias
542 LEGACY_SELECT_FROM_ALIAS = enum.auto()
543
544
545class ORMStatementAdapter(sql_util.ColumnAdapter):
546 """ColumnAdapter which includes a role attribute."""
547
548 __slots__ = ("role",)
549
550 def __init__(
551 self,
552 role: _TraceAdaptRole,
553 selectable: Selectable,
554 *,
555 equivalents: Optional[_EquivalentColumnMap] = None,
556 adapt_required: bool = False,
557 allow_label_resolve: bool = True,
558 anonymize_labels: bool = False,
559 adapt_on_names: bool = False,
560 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
561 ):
562 self.role = role
563 super().__init__(
564 selectable,
565 equivalents=equivalents,
566 adapt_required=adapt_required,
567 allow_label_resolve=allow_label_resolve,
568 anonymize_labels=anonymize_labels,
569 adapt_on_names=adapt_on_names,
570 adapt_from_selectables=adapt_from_selectables,
571 )
572
573
574class ORMAdapter(sql_util.ColumnAdapter):
575 """ColumnAdapter subclass which excludes adaptation of entities from
576 non-matching mappers.
577
578 """
579
580 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
581
582 is_aliased_class: bool
583 aliased_insp: Optional[AliasedInsp[Any]]
584
585 def __init__(
586 self,
587 role: _TraceAdaptRole,
588 entity: _InternalEntityType[Any],
589 *,
590 equivalents: Optional[_EquivalentColumnMap] = None,
591 adapt_required: bool = False,
592 allow_label_resolve: bool = True,
593 anonymize_labels: bool = False,
594 selectable: Optional[Selectable] = None,
595 limit_on_entity: bool = True,
596 adapt_on_names: bool = False,
597 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
598 ):
599 self.role = role
600 self.mapper = entity.mapper
601 if selectable is None:
602 selectable = entity.selectable
603 if insp_is_aliased_class(entity):
604 self.is_aliased_class = True
605 self.aliased_insp = entity
606 else:
607 self.is_aliased_class = False
608 self.aliased_insp = None
609
610 super().__init__(
611 selectable,
612 equivalents,
613 adapt_required=adapt_required,
614 allow_label_resolve=allow_label_resolve,
615 anonymize_labels=anonymize_labels,
616 include_fn=self._include_fn if limit_on_entity else None,
617 adapt_on_names=adapt_on_names,
618 adapt_from_selectables=adapt_from_selectables,
619 )
620
621 def _include_fn(self, elem):
622 entity = elem._annotations.get("parentmapper", None)
623
624 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
625
626
627class AliasedClass(
628 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
629):
630 r"""Represents an "aliased" form of a mapped class for usage with Query.
631
632 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
633 construct, this object mimics the mapped class using a
634 ``__getattr__`` scheme and maintains a reference to a
635 real :class:`~sqlalchemy.sql.expression.Alias` object.
636
637 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
638 within a SQL statement generated by the ORM, such that an existing
639 mapped entity can be used in multiple contexts. A simple example::
640
641 # find all pairs of users with the same name
642 user_alias = aliased(User)
643 session.query(User, user_alias).join(
644 (user_alias, User.id > user_alias.id)
645 ).filter(User.name == user_alias.name)
646
647 :class:`.AliasedClass` is also capable of mapping an existing mapped
648 class to an entirely new selectable, provided this selectable is column-
649 compatible with the existing mapped selectable, and it can also be
650 configured in a mapping as the target of a :func:`_orm.relationship`.
651 See the links below for examples.
652
653 The :class:`.AliasedClass` object is constructed typically using the
654 :func:`_orm.aliased` function. It also is produced with additional
655 configuration when using the :func:`_orm.with_polymorphic` function.
656
657 The resulting object is an instance of :class:`.AliasedClass`.
658 This object implements an attribute scheme which produces the
659 same attribute and method interface as the original mapped
660 class, allowing :class:`.AliasedClass` to be compatible
661 with any attribute technique which works on the original class,
662 including hybrid attributes (see :ref:`hybrids_toplevel`).
663
664 The :class:`.AliasedClass` can be inspected for its underlying
665 :class:`_orm.Mapper`, aliased selectable, and other information
666 using :func:`_sa.inspect`::
667
668 from sqlalchemy import inspect
669
670 my_alias = aliased(MyClass)
671 insp = inspect(my_alias)
672
673 The resulting inspection object is an instance of :class:`.AliasedInsp`.
674
675
676 .. seealso::
677
678 :func:`.aliased`
679
680 :func:`.with_polymorphic`
681
682 :ref:`relationship_aliased_class`
683
684 :ref:`relationship_to_window_function`
685
686
687 """
688
689 __name__: str
690
691 def __init__(
692 self,
693 mapped_class_or_ac: _EntityType[_O],
694 alias: Optional[FromClause] = None,
695 name: Optional[str] = None,
696 flat: bool = False,
697 adapt_on_names: bool = False,
698 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
699 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
700 base_alias: Optional[AliasedInsp[Any]] = None,
701 use_mapper_path: bool = False,
702 represents_outer_join: bool = False,
703 ):
704 insp = cast(
705 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
706 )
707 mapper = insp.mapper
708
709 nest_adapters = False
710
711 if alias is None:
712 if insp.is_aliased_class and insp.selectable._is_subquery:
713 alias = insp.selectable.alias()
714 else:
715 alias = (
716 mapper._with_polymorphic_selectable._anonymous_fromclause(
717 name=name,
718 flat=flat,
719 )
720 )
721 elif insp.is_aliased_class:
722 nest_adapters = True
723
724 assert alias is not None
725 self._aliased_insp = AliasedInsp(
726 self,
727 insp,
728 alias,
729 name,
730 (
731 with_polymorphic_mappers
732 if with_polymorphic_mappers
733 else mapper.with_polymorphic_mappers
734 ),
735 (
736 with_polymorphic_discriminator
737 if with_polymorphic_discriminator is not None
738 else mapper.polymorphic_on
739 ),
740 base_alias,
741 use_mapper_path,
742 adapt_on_names,
743 represents_outer_join,
744 nest_adapters,
745 )
746
747 self.__name__ = f"aliased({mapper.class_.__name__})"
748
749 @classmethod
750 def _reconstitute_from_aliased_insp(
751 cls, aliased_insp: AliasedInsp[_O]
752 ) -> AliasedClass[_O]:
753 obj = cls.__new__(cls)
754 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
755 obj._aliased_insp = aliased_insp
756
757 if aliased_insp._is_with_polymorphic:
758 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
759 if sub_aliased_insp is not aliased_insp:
760 ent = AliasedClass._reconstitute_from_aliased_insp(
761 sub_aliased_insp
762 )
763 setattr(obj, sub_aliased_insp.class_.__name__, ent)
764
765 return obj
766
767 def __getattr__(self, key: str) -> Any:
768 try:
769 _aliased_insp = self.__dict__["_aliased_insp"]
770 except KeyError:
771 raise AttributeError()
772 else:
773 target = _aliased_insp._target
774 # maintain all getattr mechanics
775 attr = getattr(target, key)
776
777 # attribute is a method, that will be invoked against a
778 # "self"; so just return a new method with the same function and
779 # new self
780 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
781 return types.MethodType(attr.__func__, self)
782
783 # attribute is a descriptor, that will be invoked against a
784 # "self"; so invoke the descriptor against this self
785 if hasattr(attr, "__get__"):
786 attr = attr.__get__(None, self)
787
788 # attributes within the QueryableAttribute system will want this
789 # to be invoked so the object can be adapted
790 if hasattr(attr, "adapt_to_entity"):
791 attr = attr.adapt_to_entity(_aliased_insp)
792 setattr(self, key, attr)
793
794 return attr
795
796 def _get_from_serialized(
797 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
798 ) -> Any:
799 # this method is only used in terms of the
800 # sqlalchemy.ext.serializer extension
801 attr = getattr(mapped_class, key)
802 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
803 return types.MethodType(attr.__func__, self)
804
805 # attribute is a descriptor, that will be invoked against a
806 # "self"; so invoke the descriptor against this self
807 if hasattr(attr, "__get__"):
808 attr = attr.__get__(None, self)
809
810 # attributes within the QueryableAttribute system will want this
811 # to be invoked so the object can be adapted
812 if hasattr(attr, "adapt_to_entity"):
813 aliased_insp._weak_entity = weakref.ref(self)
814 attr = attr.adapt_to_entity(aliased_insp)
815 setattr(self, key, attr)
816
817 return attr
818
819 def __repr__(self) -> str:
820 return "<AliasedClass at 0x%x; %s>" % (
821 id(self),
822 self._aliased_insp._target.__name__,
823 )
824
825 def __str__(self) -> str:
826 return str(self._aliased_insp)
827
828
829@inspection._self_inspects
830class AliasedInsp(
831 ORMEntityColumnsClauseRole[_O],
832 ORMFromClauseRole,
833 HasCacheKey,
834 InspectionAttr,
835 MemoizedSlots,
836 inspection.Inspectable["AliasedInsp[_O]"],
837 Generic[_O],
838):
839 """Provide an inspection interface for an
840 :class:`.AliasedClass` object.
841
842 The :class:`.AliasedInsp` object is returned
843 given an :class:`.AliasedClass` using the
844 :func:`_sa.inspect` function::
845
846 from sqlalchemy import inspect
847 from sqlalchemy.orm import aliased
848
849 my_alias = aliased(MyMappedClass)
850 insp = inspect(my_alias)
851
852 Attributes on :class:`.AliasedInsp`
853 include:
854
855 * ``entity`` - the :class:`.AliasedClass` represented.
856 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
857 * ``selectable`` - the :class:`_expression.Alias`
858 construct which ultimately
859 represents an aliased :class:`_schema.Table` or
860 :class:`_expression.Select`
861 construct.
862 * ``name`` - the name of the alias. Also is used as the attribute
863 name when returned in a result tuple from :class:`_query.Query`.
864 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
865 objects
866 indicating all those mappers expressed in the select construct
867 for the :class:`.AliasedClass`.
868 * ``polymorphic_on`` - an alternate column or SQL expression which
869 will be used as the "discriminator" for a polymorphic load.
870
871 .. seealso::
872
873 :ref:`inspection_toplevel`
874
875 """
876
877 __slots__ = (
878 "__weakref__",
879 "_weak_entity",
880 "mapper",
881 "selectable",
882 "name",
883 "_adapt_on_names",
884 "with_polymorphic_mappers",
885 "polymorphic_on",
886 "_use_mapper_path",
887 "_base_alias",
888 "represents_outer_join",
889 "persist_selectable",
890 "local_table",
891 "_is_with_polymorphic",
892 "_with_polymorphic_entities",
893 "_adapter",
894 "_target",
895 "__clause_element__",
896 "_memoized_values",
897 "_all_column_expressions",
898 "_nest_adapters",
899 )
900
901 _cache_key_traversal = [
902 ("name", visitors.ExtendedInternalTraversal.dp_string),
903 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
904 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
905 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
906 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
907 (
908 "with_polymorphic_mappers",
909 visitors.InternalTraversal.dp_has_cache_key_list,
910 ),
911 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
912 ]
913
914 mapper: Mapper[_O]
915 selectable: FromClause
916 _adapter: ORMAdapter
917 with_polymorphic_mappers: Sequence[Mapper[Any]]
918 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
919
920 _weak_entity: weakref.ref[AliasedClass[_O]]
921 """the AliasedClass that refers to this AliasedInsp"""
922
923 _target: Union[Type[_O], AliasedClass[_O]]
924 """the thing referenced by the AliasedClass/AliasedInsp.
925
926 In the vast majority of cases, this is the mapped class. However
927 it may also be another AliasedClass (alias of alias).
928
929 """
930
931 def __init__(
932 self,
933 entity: AliasedClass[_O],
934 inspected: _InternalEntityType[_O],
935 selectable: FromClause,
936 name: Optional[str],
937 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
938 polymorphic_on: Optional[ColumnElement[Any]],
939 _base_alias: Optional[AliasedInsp[Any]],
940 _use_mapper_path: bool,
941 adapt_on_names: bool,
942 represents_outer_join: bool,
943 nest_adapters: bool,
944 ):
945 mapped_class_or_ac = inspected.entity
946 mapper = inspected.mapper
947
948 self._weak_entity = weakref.ref(entity)
949 self.mapper = mapper
950 self.selectable = self.persist_selectable = self.local_table = (
951 selectable
952 )
953 self.name = name
954 self.polymorphic_on = polymorphic_on
955 self._base_alias = weakref.ref(_base_alias or self)
956 self._use_mapper_path = _use_mapper_path
957 self.represents_outer_join = represents_outer_join
958 self._nest_adapters = nest_adapters
959
960 if with_polymorphic_mappers:
961 self._is_with_polymorphic = True
962 self.with_polymorphic_mappers = with_polymorphic_mappers
963 self._with_polymorphic_entities = []
964 for poly in self.with_polymorphic_mappers:
965 if poly is not mapper:
966 ent = AliasedClass(
967 poly.class_,
968 selectable,
969 base_alias=self,
970 adapt_on_names=adapt_on_names,
971 use_mapper_path=_use_mapper_path,
972 )
973
974 setattr(self.entity, poly.class_.__name__, ent)
975 self._with_polymorphic_entities.append(ent._aliased_insp)
976
977 else:
978 self._is_with_polymorphic = False
979 self.with_polymorphic_mappers = [mapper]
980
981 self._adapter = ORMAdapter(
982 _TraceAdaptRole.ALIASED_INSP,
983 mapper,
984 selectable=selectable,
985 equivalents=mapper._equivalent_columns,
986 adapt_on_names=adapt_on_names,
987 anonymize_labels=True,
988 # make sure the adapter doesn't try to grab other tables that
989 # are not even the thing we are mapping, such as embedded
990 # selectables in subqueries or CTEs. See issue #6060
991 adapt_from_selectables={
992 m.selectable
993 for m in self.with_polymorphic_mappers
994 if not adapt_on_names
995 },
996 limit_on_entity=False,
997 )
998
999 if nest_adapters:
1000 # supports "aliased class of aliased class" use case
1001 assert isinstance(inspected, AliasedInsp)
1002 self._adapter = inspected._adapter.wrap(self._adapter)
1003
1004 self._adapt_on_names = adapt_on_names
1005 self._target = mapped_class_or_ac
1006
1007 @classmethod
1008 def _alias_factory(
1009 cls,
1010 element: Union[_EntityType[_O], FromClause],
1011 alias: Optional[FromClause] = None,
1012 name: Optional[str] = None,
1013 flat: bool = False,
1014 adapt_on_names: bool = False,
1015 ) -> Union[AliasedClass[_O], FromClause]:
1016 if isinstance(element, FromClause):
1017 if adapt_on_names:
1018 raise sa_exc.ArgumentError(
1019 "adapt_on_names only applies to ORM elements"
1020 )
1021 if name:
1022 return element.alias(name=name, flat=flat)
1023 else:
1024 return coercions.expect(
1025 roles.AnonymizedFromClauseRole, element, flat=flat
1026 )
1027 else:
1028 return AliasedClass(
1029 element,
1030 alias=alias,
1031 flat=flat,
1032 name=name,
1033 adapt_on_names=adapt_on_names,
1034 )
1035
1036 @classmethod
1037 def _with_polymorphic_factory(
1038 cls,
1039 base: Union[Type[_O], Mapper[_O]],
1040 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1041 selectable: Union[Literal[False, None], FromClause] = False,
1042 flat: bool = False,
1043 polymorphic_on: Optional[ColumnElement[Any]] = None,
1044 aliased: bool = False,
1045 innerjoin: bool = False,
1046 adapt_on_names: bool = False,
1047 name: Optional[str] = None,
1048 _use_mapper_path: bool = False,
1049 ) -> AliasedClass[_O]:
1050 primary_mapper = _class_to_mapper(base)
1051
1052 if selectable not in (None, False) and flat:
1053 raise sa_exc.ArgumentError(
1054 "the 'flat' and 'selectable' arguments cannot be passed "
1055 "simultaneously to with_polymorphic()"
1056 )
1057
1058 mappers, selectable = primary_mapper._with_polymorphic_args(
1059 classes, selectable, innerjoin=innerjoin
1060 )
1061 if aliased or flat:
1062 assert selectable is not None
1063 selectable = selectable._anonymous_fromclause(flat=flat)
1064
1065 return AliasedClass(
1066 base,
1067 selectable,
1068 name=name,
1069 with_polymorphic_mappers=mappers,
1070 adapt_on_names=adapt_on_names,
1071 with_polymorphic_discriminator=polymorphic_on,
1072 use_mapper_path=_use_mapper_path,
1073 represents_outer_join=not innerjoin,
1074 )
1075
1076 @property
1077 def entity(self) -> AliasedClass[_O]:
1078 # to eliminate reference cycles, the AliasedClass is held weakly.
1079 # this produces some situations where the AliasedClass gets lost,
1080 # particularly when one is created internally and only the AliasedInsp
1081 # is passed around.
1082 # to work around this case, we just generate a new one when we need
1083 # it, as it is a simple class with very little initial state on it.
1084 ent = self._weak_entity()
1085 if ent is None:
1086 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1087 self._weak_entity = weakref.ref(ent)
1088 return ent
1089
1090 is_aliased_class = True
1091 "always returns True"
1092
1093 def _memoized_method___clause_element__(self) -> FromClause:
1094 return self.selectable._annotate(
1095 {
1096 "parentmapper": self.mapper,
1097 "parententity": self,
1098 "entity_namespace": self,
1099 }
1100 )._set_propagate_attrs(
1101 {"compile_state_plugin": "orm", "plugin_subject": self}
1102 )
1103
1104 @property
1105 def entity_namespace(self) -> AliasedClass[_O]:
1106 return self.entity
1107
1108 @property
1109 def class_(self) -> Type[_O]:
1110 """Return the mapped class ultimately represented by this
1111 :class:`.AliasedInsp`."""
1112 return self.mapper.class_
1113
1114 @property
1115 def _path_registry(self) -> _AbstractEntityRegistry:
1116 if self._use_mapper_path:
1117 return self.mapper._path_registry
1118 else:
1119 return PathRegistry.per_mapper(self)
1120
1121 def __getstate__(self) -> Dict[str, Any]:
1122 return {
1123 "entity": self.entity,
1124 "mapper": self.mapper,
1125 "alias": self.selectable,
1126 "name": self.name,
1127 "adapt_on_names": self._adapt_on_names,
1128 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1129 "with_polymorphic_discriminator": self.polymorphic_on,
1130 "base_alias": self._base_alias(),
1131 "use_mapper_path": self._use_mapper_path,
1132 "represents_outer_join": self.represents_outer_join,
1133 "nest_adapters": self._nest_adapters,
1134 }
1135
1136 def __setstate__(self, state: Dict[str, Any]) -> None:
1137 self.__init__( # type: ignore
1138 state["entity"],
1139 state["mapper"],
1140 state["alias"],
1141 state["name"],
1142 state["with_polymorphic_mappers"],
1143 state["with_polymorphic_discriminator"],
1144 state["base_alias"],
1145 state["use_mapper_path"],
1146 state["adapt_on_names"],
1147 state["represents_outer_join"],
1148 state["nest_adapters"],
1149 )
1150
1151 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1152 # assert self._is_with_polymorphic
1153 # assert other._is_with_polymorphic
1154
1155 primary_mapper = other.mapper
1156
1157 assert self.mapper is primary_mapper
1158
1159 our_classes = util.to_set(
1160 mp.class_ for mp in self.with_polymorphic_mappers
1161 )
1162 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1163 if our_classes == new_classes:
1164 return other
1165 else:
1166 classes = our_classes.union(new_classes)
1167
1168 mappers, selectable = primary_mapper._with_polymorphic_args(
1169 classes, None, innerjoin=not other.represents_outer_join
1170 )
1171 selectable = selectable._anonymous_fromclause(flat=True)
1172 return AliasedClass(
1173 primary_mapper,
1174 selectable,
1175 with_polymorphic_mappers=mappers,
1176 with_polymorphic_discriminator=other.polymorphic_on,
1177 use_mapper_path=other._use_mapper_path,
1178 represents_outer_join=other.represents_outer_join,
1179 )._aliased_insp
1180
1181 def _adapt_element(
1182 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1183 ) -> _ORMCOLEXPR:
1184 assert isinstance(expr, ColumnElement)
1185 d: Dict[str, Any] = {
1186 "parententity": self,
1187 "parentmapper": self.mapper,
1188 }
1189 if key:
1190 d["proxy_key"] = key
1191
1192 # userspace adapt of an attribute from AliasedClass; validate that
1193 # it actually was present
1194 adapted = self._adapter.adapt_check_present(expr)
1195 if adapted is None:
1196 adapted = expr
1197 if self._adapter.adapt_on_names:
1198 util.warn_limited(
1199 "Did not locate an expression in selectable for "
1200 "attribute %r; ensure name is correct in expression",
1201 (key,),
1202 )
1203 else:
1204 util.warn_limited(
1205 "Did not locate an expression in selectable for "
1206 "attribute %r; to match by name, use the "
1207 "adapt_on_names parameter",
1208 (key,),
1209 )
1210
1211 return adapted._annotate(d)._set_propagate_attrs(
1212 {"compile_state_plugin": "orm", "plugin_subject": self}
1213 )
1214
1215 if TYPE_CHECKING:
1216 # establish compatibility with the _ORMAdapterProto protocol,
1217 # which in turn is compatible with _CoreAdapterProto.
1218
1219 def _orm_adapt_element(
1220 self,
1221 obj: _CE,
1222 key: Optional[str] = None,
1223 ) -> _CE: ...
1224
1225 else:
1226 _orm_adapt_element = _adapt_element
1227
1228 def _entity_for_mapper(self, mapper):
1229 self_poly = self.with_polymorphic_mappers
1230 if mapper in self_poly:
1231 if mapper is self.mapper:
1232 return self
1233 else:
1234 return getattr(
1235 self.entity, mapper.class_.__name__
1236 )._aliased_insp
1237 elif mapper.isa(self.mapper):
1238 return self
1239 else:
1240 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1241
1242 def _memoized_attr__get_clause(self):
1243 onclause, replacemap = self.mapper._get_clause
1244 return (
1245 self._adapter.traverse(onclause),
1246 {
1247 self._adapter.traverse(col): param
1248 for col, param in replacemap.items()
1249 },
1250 )
1251
1252 def _memoized_attr__memoized_values(self):
1253 return {}
1254
1255 def _memoized_attr__all_column_expressions(self):
1256 if self._is_with_polymorphic:
1257 cols_plus_keys = self.mapper._columns_plus_keys(
1258 [ent.mapper for ent in self._with_polymorphic_entities]
1259 )
1260 else:
1261 cols_plus_keys = self.mapper._columns_plus_keys()
1262
1263 cols_plus_keys = [
1264 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1265 ]
1266
1267 return ColumnCollection(cols_plus_keys)
1268
1269 def _memo(self, key, callable_, *args, **kw):
1270 if key in self._memoized_values:
1271 return self._memoized_values[key]
1272 else:
1273 self._memoized_values[key] = value = callable_(*args, **kw)
1274 return value
1275
1276 def __repr__(self):
1277 if self.with_polymorphic_mappers:
1278 with_poly = "(%s)" % ", ".join(
1279 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1280 )
1281 else:
1282 with_poly = ""
1283 return "<AliasedInsp at 0x%x; %s%s>" % (
1284 id(self),
1285 self.class_.__name__,
1286 with_poly,
1287 )
1288
1289 def __str__(self):
1290 if self._is_with_polymorphic:
1291 return "with_polymorphic(%s, [%s])" % (
1292 self._target.__name__,
1293 ", ".join(
1294 mp.class_.__name__
1295 for mp in self.with_polymorphic_mappers
1296 if mp is not self.mapper
1297 ),
1298 )
1299 else:
1300 return "aliased(%s)" % (self._target.__name__,)
1301
1302
1303class _WrapUserEntity:
1304 """A wrapper used within the loader_criteria lambda caller so that
1305 we can bypass declared_attr descriptors on unmapped mixins, which
1306 normally emit a warning for such use.
1307
1308 might also be useful for other per-lambda instrumentations should
1309 the need arise.
1310
1311 """
1312
1313 __slots__ = ("subject",)
1314
1315 def __init__(self, subject):
1316 self.subject = subject
1317
1318 @util.preload_module("sqlalchemy.orm.decl_api")
1319 def __getattribute__(self, name):
1320 decl_api = util.preloaded.orm.decl_api
1321
1322 subject = object.__getattribute__(self, "subject")
1323 if name in subject.__dict__ and isinstance(
1324 subject.__dict__[name], decl_api.declared_attr
1325 ):
1326 return subject.__dict__[name].fget(subject)
1327 else:
1328 return getattr(subject, name)
1329
1330
1331class LoaderCriteriaOption(CriteriaOption):
1332 """Add additional WHERE criteria to the load for all occurrences of
1333 a particular entity.
1334
1335 :class:`_orm.LoaderCriteriaOption` is invoked using the
1336 :func:`_orm.with_loader_criteria` function; see that function for
1337 details.
1338
1339 .. versionadded:: 1.4
1340
1341 """
1342
1343 __slots__ = (
1344 "root_entity",
1345 "entity",
1346 "deferred_where_criteria",
1347 "where_criteria",
1348 "_where_crit_orig",
1349 "include_aliases",
1350 "propagate_to_loaders",
1351 )
1352
1353 _traverse_internals = [
1354 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1355 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1356 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1357 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1358 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1359 ]
1360
1361 root_entity: Optional[Type[Any]]
1362 entity: Optional[_InternalEntityType[Any]]
1363 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1364 deferred_where_criteria: bool
1365 include_aliases: bool
1366 propagate_to_loaders: bool
1367
1368 _where_crit_orig: Any
1369
1370 def __init__(
1371 self,
1372 entity_or_base: _EntityType[Any],
1373 where_criteria: Union[
1374 _ColumnExpressionArgument[bool],
1375 Callable[[Any], _ColumnExpressionArgument[bool]],
1376 ],
1377 loader_only: bool = False,
1378 include_aliases: bool = False,
1379 propagate_to_loaders: bool = True,
1380 track_closure_variables: bool = True,
1381 ):
1382 entity = cast(
1383 "_InternalEntityType[Any]",
1384 inspection.inspect(entity_or_base, False),
1385 )
1386 if entity is None:
1387 self.root_entity = cast("Type[Any]", entity_or_base)
1388 self.entity = None
1389 else:
1390 self.root_entity = None
1391 self.entity = entity
1392
1393 self._where_crit_orig = where_criteria
1394 if callable(where_criteria):
1395 if self.root_entity is not None:
1396 wrap_entity = self.root_entity
1397 else:
1398 assert entity is not None
1399 wrap_entity = entity.entity
1400
1401 self.deferred_where_criteria = True
1402 self.where_criteria = lambdas.DeferredLambdaElement(
1403 where_criteria,
1404 roles.WhereHavingRole,
1405 lambda_args=(_WrapUserEntity(wrap_entity),),
1406 opts=lambdas.LambdaOptions(
1407 track_closure_variables=track_closure_variables
1408 ),
1409 )
1410 else:
1411 self.deferred_where_criteria = False
1412 self.where_criteria = coercions.expect(
1413 roles.WhereHavingRole, where_criteria
1414 )
1415
1416 self.include_aliases = include_aliases
1417 self.propagate_to_loaders = propagate_to_loaders
1418
1419 @classmethod
1420 def _unreduce(
1421 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1422 ):
1423 return LoaderCriteriaOption(
1424 entity,
1425 where_criteria,
1426 include_aliases=include_aliases,
1427 propagate_to_loaders=propagate_to_loaders,
1428 )
1429
1430 def __reduce__(self):
1431 return (
1432 LoaderCriteriaOption._unreduce,
1433 (
1434 self.entity.class_ if self.entity else self.root_entity,
1435 self._where_crit_orig,
1436 self.include_aliases,
1437 self.propagate_to_loaders,
1438 ),
1439 )
1440
1441 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1442 if self.entity:
1443 yield from self.entity.mapper.self_and_descendants
1444 else:
1445 assert self.root_entity
1446 stack = list(self.root_entity.__subclasses__())
1447 while stack:
1448 subclass = stack.pop(0)
1449 ent = cast(
1450 "_InternalEntityType[Any]",
1451 inspection.inspect(subclass, raiseerr=False),
1452 )
1453 if ent:
1454 yield from ent.mapper.self_and_descendants
1455 else:
1456 stack.extend(subclass.__subclasses__())
1457
1458 def _should_include(self, compile_state: _ORMCompileState) -> bool:
1459 if (
1460 compile_state.select_statement._annotations.get(
1461 "for_loader_criteria", None
1462 )
1463 is self
1464 ):
1465 return False
1466 return True
1467
1468 def _resolve_where_criteria(
1469 self, ext_info: _InternalEntityType[Any]
1470 ) -> ColumnElement[bool]:
1471 if self.deferred_where_criteria:
1472 crit = cast(
1473 "ColumnElement[bool]",
1474 self.where_criteria._resolve_with_args(ext_info.entity),
1475 )
1476 else:
1477 crit = self.where_criteria # type: ignore
1478 assert isinstance(crit, ColumnElement)
1479 return sql_util._deep_annotate(
1480 crit,
1481 {"for_loader_criteria": self},
1482 detect_subquery_cols=True,
1483 ind_cols_on_fromclause=True,
1484 )
1485
1486 def process_compile_state_replaced_entities(
1487 self,
1488 compile_state: _ORMCompileState,
1489 mapper_entities: Iterable[_MapperEntity],
1490 ) -> None:
1491 self.process_compile_state(compile_state)
1492
1493 def process_compile_state(self, compile_state: _ORMCompileState) -> None:
1494 """Apply a modification to a given :class:`.CompileState`."""
1495
1496 # if options to limit the criteria to immediate query only,
1497 # use compile_state.attributes instead
1498
1499 self.get_global_criteria(compile_state.global_attributes)
1500
1501 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1502 for mp in self._all_mappers():
1503 load_criteria = attributes.setdefault(
1504 ("additional_entity_criteria", mp), []
1505 )
1506
1507 load_criteria.append(self)
1508
1509
1510inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1511
1512
1513@inspection._inspects(type)
1514def _inspect_mc(
1515 class_: Type[_O],
1516) -> Optional[Mapper[_O]]:
1517 try:
1518 class_manager = opt_manager_of_class(class_)
1519 if class_manager is None or not class_manager.is_mapped:
1520 return None
1521 mapper = class_manager.mapper
1522 except exc.NO_STATE:
1523 return None
1524 else:
1525 return mapper
1526
1527
1528GenericAlias = type(List[Any])
1529
1530
1531@inspection._inspects(GenericAlias)
1532def _inspect_generic_alias(
1533 class_: Type[_O],
1534) -> Optional[Mapper[_O]]:
1535 origin = cast("Type[_O]", get_origin(class_))
1536 return _inspect_mc(origin)
1537
1538
1539@inspection._self_inspects
1540class Bundle(
1541 ORMColumnsClauseRole[_T],
1542 SupportsCloneAnnotations,
1543 MemoizedHasCacheKey,
1544 inspection.Inspectable["Bundle[_T]"],
1545 InspectionAttr,
1546):
1547 """A grouping of SQL expressions that are returned by a :class:`.Query`
1548 under one namespace.
1549
1550 The :class:`.Bundle` essentially allows nesting of the tuple-based
1551 results returned by a column-oriented :class:`_query.Query` object.
1552 It also
1553 is extensible via simple subclassing, where the primary capability
1554 to override is that of how the set of expressions should be returned,
1555 allowing post-processing as well as custom return types, without
1556 involving ORM identity-mapped classes.
1557
1558 .. seealso::
1559
1560 :ref:`bundles`
1561
1562
1563 """
1564
1565 single_entity = False
1566 """If True, queries for a single Bundle will be returned as a single
1567 entity, rather than an element within a keyed tuple."""
1568
1569 is_clause_element = False
1570
1571 is_mapper = False
1572
1573 is_aliased_class = False
1574
1575 is_bundle = True
1576
1577 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1578
1579 proxy_set = util.EMPTY_SET
1580
1581 exprs: List[_ColumnsClauseElement]
1582
1583 def __init__(
1584 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1585 ):
1586 r"""Construct a new :class:`.Bundle`.
1587
1588 e.g.::
1589
1590 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1591
1592 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1593 print(row.mybundle.x, row.mybundle.y)
1594
1595 :param name: name of the bundle.
1596 :param \*exprs: columns or SQL expressions comprising the bundle.
1597 :param single_entity=False: if True, rows for this :class:`.Bundle`
1598 can be returned as a "single entity" outside of any enclosing tuple
1599 in the same manner as a mapped entity.
1600
1601 """ # noqa: E501
1602 self.name = self._label = name
1603 coerced_exprs = [
1604 coercions.expect(
1605 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1606 )
1607 for expr in exprs
1608 ]
1609 self.exprs = coerced_exprs
1610
1611 self.c = self.columns = ColumnCollection(
1612 (getattr(col, "key", col._label), col)
1613 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1614 ).as_readonly()
1615 self.single_entity = kw.pop("single_entity", self.single_entity)
1616
1617 def _gen_cache_key(
1618 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1619 ) -> Tuple[Any, ...]:
1620 return (self.__class__, self.name, self.single_entity) + tuple(
1621 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1622 )
1623
1624 @property
1625 def mapper(self) -> Optional[Mapper[Any]]:
1626 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1627 "parentmapper", None
1628 )
1629 return mp
1630
1631 @property
1632 def entity(self) -> Optional[_InternalEntityType[Any]]:
1633 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1634 0
1635 ]._annotations.get("parententity", None)
1636 return ie
1637
1638 @property
1639 def entity_namespace(
1640 self,
1641 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1642 return self.c
1643
1644 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1645
1646 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1647
1648 e.g.::
1649
1650 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1651
1652 q = sess.query(bn).filter(bn.c.x == 5)
1653
1654 Nesting of bundles is also supported::
1655
1656 b1 = Bundle(
1657 "b1",
1658 Bundle("b2", MyClass.a, MyClass.b),
1659 Bundle("b3", MyClass.x, MyClass.y),
1660 )
1661
1662 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1663
1664 .. seealso::
1665
1666 :attr:`.Bundle.c`
1667
1668 """ # noqa: E501
1669
1670 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1671 """An alias for :attr:`.Bundle.columns`."""
1672
1673 def _clone(self, **kw):
1674 cloned = self.__class__.__new__(self.__class__)
1675 cloned.__dict__.update(self.__dict__)
1676 return cloned
1677
1678 def __clause_element__(self):
1679 # ensure existing entity_namespace remains
1680 annotations = {"bundle": self, "entity_namespace": self}
1681 annotations.update(self._annotations)
1682
1683 plugin_subject = self.exprs[0]._propagate_attrs.get(
1684 "plugin_subject", self.entity
1685 )
1686 return (
1687 expression.ClauseList(
1688 _literal_as_text_role=roles.ColumnsClauseRole,
1689 group=False,
1690 *[e._annotations.get("bundle", e) for e in self.exprs],
1691 )
1692 ._annotate(annotations)
1693 ._set_propagate_attrs(
1694 # the Bundle *must* use the orm plugin no matter what. the
1695 # subject can be None but it's much better if it's not.
1696 {
1697 "compile_state_plugin": "orm",
1698 "plugin_subject": plugin_subject,
1699 }
1700 )
1701 )
1702
1703 @property
1704 def clauses(self):
1705 return self.__clause_element__().clauses
1706
1707 def label(self, name):
1708 """Provide a copy of this :class:`.Bundle` passing a new label."""
1709
1710 cloned = self._clone()
1711 cloned.name = name
1712 return cloned
1713
1714 def create_row_processor(
1715 self,
1716 query: Select[Unpack[TupleAny]],
1717 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1718 labels: Sequence[str],
1719 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1720 """Produce the "row processing" function for this :class:`.Bundle`.
1721
1722 May be overridden by subclasses to provide custom behaviors when
1723 results are fetched. The method is passed the statement object and a
1724 set of "row processor" functions at query execution time; these
1725 processor functions when given a result row will return the individual
1726 attribute value, which can then be adapted into any kind of return data
1727 structure.
1728
1729 The example below illustrates replacing the usual :class:`.Row`
1730 return structure with a straight Python dictionary::
1731
1732 from sqlalchemy.orm import Bundle
1733
1734
1735 class DictBundle(Bundle):
1736 def create_row_processor(self, query, procs, labels):
1737 "Override create_row_processor to return values as dictionaries"
1738
1739 def proc(row):
1740 return dict(zip(labels, (proc(row) for proc in procs)))
1741
1742 return proc
1743
1744 A result from the above :class:`_orm.Bundle` will return dictionary
1745 values::
1746
1747 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1748 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1749 print(row.mybundle["data1"], row.mybundle["data2"])
1750
1751 """ # noqa: E501
1752 keyed_tuple = result_tuple(labels, [() for l in labels])
1753
1754 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1755 return keyed_tuple([proc(row) for proc in procs])
1756
1757 return proc
1758
1759
1760def _orm_full_deannotate(element: _SA) -> _SA:
1761 return sql_util._deep_deannotate(element)
1762
1763
1764class _ORMJoin(expression.Join):
1765 """Extend Join to support ORM constructs as input."""
1766
1767 __visit_name__ = expression.Join.__visit_name__
1768
1769 inherit_cache = True
1770
1771 def __init__(
1772 self,
1773 left: _FromClauseArgument,
1774 right: _FromClauseArgument,
1775 onclause: Optional[_OnClauseArgument] = None,
1776 isouter: bool = False,
1777 full: bool = False,
1778 _left_memo: Optional[Any] = None,
1779 _right_memo: Optional[Any] = None,
1780 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1781 ):
1782 left_info = cast(
1783 "Union[FromClause, _InternalEntityType[Any]]",
1784 inspection.inspect(left),
1785 )
1786
1787 right_info = cast(
1788 "Union[FromClause, _InternalEntityType[Any]]",
1789 inspection.inspect(right),
1790 )
1791 adapt_to = right_info.selectable
1792
1793 # used by joined eager loader
1794 self._left_memo = _left_memo
1795 self._right_memo = _right_memo
1796
1797 if isinstance(onclause, attributes.QueryableAttribute):
1798 if TYPE_CHECKING:
1799 assert isinstance(
1800 onclause.comparator, RelationshipProperty.Comparator
1801 )
1802 on_selectable = onclause.comparator._source_selectable()
1803 prop = onclause.property
1804 _extra_criteria += onclause._extra_criteria
1805 elif isinstance(onclause, MapperProperty):
1806 # used internally by joined eager loader...possibly not ideal
1807 prop = onclause
1808 on_selectable = prop.parent.selectable
1809 else:
1810 prop = None
1811 on_selectable = None
1812
1813 left_selectable = left_info.selectable
1814 if prop:
1815 adapt_from: Optional[FromClause]
1816 if sql_util.clause_is_present(on_selectable, left_selectable):
1817 adapt_from = on_selectable
1818 else:
1819 assert isinstance(left_selectable, FromClause)
1820 adapt_from = left_selectable
1821
1822 (
1823 pj,
1824 sj,
1825 source,
1826 dest,
1827 secondary,
1828 target_adapter,
1829 ) = prop._create_joins(
1830 source_selectable=adapt_from,
1831 dest_selectable=adapt_to,
1832 source_polymorphic=True,
1833 of_type_entity=right_info,
1834 alias_secondary=True,
1835 extra_criteria=_extra_criteria,
1836 )
1837
1838 if sj is not None:
1839 if isouter:
1840 # note this is an inner join from secondary->right
1841 right = sql.join(secondary, right, sj)
1842 onclause = pj
1843 else:
1844 left = sql.join(left, secondary, pj, isouter)
1845 onclause = sj
1846 else:
1847 onclause = pj
1848
1849 self._target_adapter = target_adapter
1850
1851 # we don't use the normal coercions logic for _ORMJoin
1852 # (probably should), so do some gymnastics to get the entity.
1853 # logic here is for #8721, which was a major bug in 1.4
1854 # for almost two years, not reported/fixed until 1.4.43 (!)
1855 if is_selectable(left_info):
1856 parententity = left_selectable._annotations.get(
1857 "parententity", None
1858 )
1859 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1860 parententity = left_info
1861 else:
1862 parententity = None
1863
1864 if parententity is not None:
1865 self._annotations = self._annotations.union(
1866 {"parententity": parententity}
1867 )
1868
1869 augment_onclause = bool(_extra_criteria) and not prop
1870 expression.Join.__init__(self, left, right, onclause, isouter, full)
1871
1872 assert self.onclause is not None
1873
1874 if augment_onclause:
1875 self.onclause &= sql.and_(*_extra_criteria)
1876
1877 if (
1878 not prop
1879 and getattr(right_info, "mapper", None)
1880 and right_info.mapper.single # type: ignore
1881 ):
1882 right_info = cast("_InternalEntityType[Any]", right_info)
1883 # if single inheritance target and we are using a manual
1884 # or implicit ON clause, augment it the same way we'd augment the
1885 # WHERE.
1886 single_crit = right_info.mapper._single_table_criterion
1887 if single_crit is not None:
1888 if insp_is_aliased_class(right_info):
1889 single_crit = right_info._adapter.traverse(single_crit)
1890 self.onclause = self.onclause & single_crit
1891
1892 def _splice_into_center(self, other):
1893 """Splice a join into the center.
1894
1895 Given join(a, b) and join(b, c), return join(a, b).join(c)
1896
1897 """
1898 leftmost = other
1899 while isinstance(leftmost, sql.Join):
1900 leftmost = leftmost.left
1901
1902 assert self.right is leftmost
1903
1904 left = _ORMJoin(
1905 self.left,
1906 other.left,
1907 self.onclause,
1908 isouter=self.isouter,
1909 _left_memo=self._left_memo,
1910 _right_memo=other._left_memo._path_registry,
1911 )
1912
1913 return _ORMJoin(
1914 left,
1915 other.right,
1916 other.onclause,
1917 isouter=other.isouter,
1918 _right_memo=other._right_memo,
1919 )
1920
1921 def join(
1922 self,
1923 right: _FromClauseArgument,
1924 onclause: Optional[_OnClauseArgument] = None,
1925 isouter: bool = False,
1926 full: bool = False,
1927 ) -> _ORMJoin:
1928 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1929
1930 def outerjoin(
1931 self,
1932 right: _FromClauseArgument,
1933 onclause: Optional[_OnClauseArgument] = None,
1934 full: bool = False,
1935 ) -> _ORMJoin:
1936 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1937
1938
1939def with_parent(
1940 instance: object,
1941 prop: attributes.QueryableAttribute[Any],
1942 from_entity: Optional[_EntityType[Any]] = None,
1943) -> ColumnElement[bool]:
1944 """Create filtering criterion that relates this query's primary entity
1945 to the given related instance, using established
1946 :func:`_orm.relationship()`
1947 configuration.
1948
1949 E.g.::
1950
1951 stmt = select(Address).where(with_parent(some_user, User.addresses))
1952
1953 The SQL rendered is the same as that rendered when a lazy loader
1954 would fire off from the given parent on that attribute, meaning
1955 that the appropriate state is taken from the parent object in
1956 Python without the need to render joins to the parent table
1957 in the rendered statement.
1958
1959 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1960 to indicate the left side of the criteria::
1961
1962
1963 a1 = aliased(Address)
1964 a2 = aliased(Address)
1965 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
1966
1967 The above use is equivalent to using the
1968 :func:`_orm.with_parent.from_entity` argument::
1969
1970 a1 = aliased(Address)
1971 a2 = aliased(Address)
1972 stmt = select(a1, a2).where(
1973 with_parent(u1, User.addresses, from_entity=a2)
1974 )
1975
1976 :param instance:
1977 An instance which has some :func:`_orm.relationship`.
1978
1979 :param property:
1980 Class-bound attribute, which indicates
1981 what relationship from the instance should be used to reconcile the
1982 parent/child relationship.
1983
1984 :param from_entity:
1985 Entity in which to consider as the left side. This defaults to the
1986 "zero" entity of the :class:`_query.Query` itself.
1987
1988 """ # noqa: E501
1989 prop_t: RelationshipProperty[Any]
1990
1991 if isinstance(prop, str):
1992 raise sa_exc.ArgumentError(
1993 "with_parent() accepts class-bound mapped attributes, not strings"
1994 )
1995 elif isinstance(prop, attributes.QueryableAttribute):
1996 if prop._of_type:
1997 from_entity = prop._of_type
1998 mapper_property = prop.property
1999 if mapper_property is None or not prop_is_relationship(
2000 mapper_property
2001 ):
2002 raise sa_exc.ArgumentError(
2003 f"Expected relationship property for with_parent(), "
2004 f"got {mapper_property}"
2005 )
2006 prop_t = mapper_property
2007 else:
2008 prop_t = prop
2009
2010 return prop_t._with_parent(instance, from_entity=from_entity)
2011
2012
2013def has_identity(object_: object) -> bool:
2014 """Return True if the given object has a database
2015 identity.
2016
2017 This typically corresponds to the object being
2018 in either the persistent or detached state.
2019
2020 .. seealso::
2021
2022 :func:`.was_deleted`
2023
2024 """
2025 state = attributes.instance_state(object_)
2026 return state.has_identity
2027
2028
2029def was_deleted(object_: object) -> bool:
2030 """Return True if the given object was deleted
2031 within a session flush.
2032
2033 This is regardless of whether or not the object is
2034 persistent or detached.
2035
2036 .. seealso::
2037
2038 :attr:`.InstanceState.was_deleted`
2039
2040 """
2041
2042 state = attributes.instance_state(object_)
2043 return state.was_deleted
2044
2045
2046def _entity_corresponds_to(
2047 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2048) -> bool:
2049 """determine if 'given' corresponds to 'entity', in terms
2050 of an entity passed to Query that would match the same entity
2051 being referred to elsewhere in the query.
2052
2053 """
2054 if insp_is_aliased_class(entity):
2055 if insp_is_aliased_class(given):
2056 if entity._base_alias() is given._base_alias():
2057 return True
2058 return False
2059 elif insp_is_aliased_class(given):
2060 if given._use_mapper_path:
2061 return entity in given.with_polymorphic_mappers
2062 else:
2063 return entity is given
2064
2065 assert insp_is_mapper(given)
2066 return entity.common_parent(given)
2067
2068
2069def _entity_corresponds_to_use_path_impl(
2070 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2071) -> bool:
2072 """determine if 'given' corresponds to 'entity', in terms
2073 of a path of loader options where a mapped attribute is taken to
2074 be a member of a parent entity.
2075
2076 e.g.::
2077
2078 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2079 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2080
2081 a1 = aliased(A)
2082 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2083 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2084
2085 wp = with_polymorphic(A, [A1, A2])
2086 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2087 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2088
2089 """
2090 if insp_is_aliased_class(given):
2091 return (
2092 insp_is_aliased_class(entity)
2093 and not entity._use_mapper_path
2094 and (given is entity or entity in given._with_polymorphic_entities)
2095 )
2096 elif not insp_is_aliased_class(entity):
2097 return given.isa(entity.mapper)
2098 else:
2099 return (
2100 entity._use_mapper_path
2101 and given in entity.with_polymorphic_mappers
2102 )
2103
2104
2105def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2106 """determine if 'given' "is a" mapper, in terms of the given
2107 would load rows of type 'mapper'.
2108
2109 """
2110 if given.is_aliased_class:
2111 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2112 mapper
2113 )
2114 elif given.with_polymorphic_mappers:
2115 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2116 else:
2117 return given.isa(mapper)
2118
2119
2120def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2121 """calculate __getitem__ in terms of an iterable query object
2122 that also has a slice() method.
2123
2124 """
2125
2126 def _no_negative_indexes():
2127 raise IndexError(
2128 "negative indexes are not accepted by SQL "
2129 "index / slice operators"
2130 )
2131
2132 if isinstance(item, slice):
2133 start, stop, step = util.decode_slice(item)
2134
2135 if (
2136 isinstance(stop, int)
2137 and isinstance(start, int)
2138 and stop - start <= 0
2139 ):
2140 return []
2141
2142 elif (isinstance(start, int) and start < 0) or (
2143 isinstance(stop, int) and stop < 0
2144 ):
2145 _no_negative_indexes()
2146
2147 res = iterable_query.slice(start, stop)
2148 if step is not None:
2149 return list(res)[None : None : item.step]
2150 else:
2151 return list(res)
2152 else:
2153 if item == -1:
2154 _no_negative_indexes()
2155 else:
2156 return list(iterable_query[item : item + 1])[0]
2157
2158
2159def _is_mapped_annotation(
2160 raw_annotation: _AnnotationScanType,
2161 cls: Type[Any],
2162 originating_cls: Type[Any],
2163) -> bool:
2164 try:
2165 annotated = de_stringify_annotation(
2166 cls, raw_annotation, originating_cls.__module__
2167 )
2168 except NameError:
2169 # in most cases, at least within our own tests, we can raise
2170 # here, which is more accurate as it prevents us from returning
2171 # false negatives. However, in the real world, try to avoid getting
2172 # involved with end-user annotations that have nothing to do with us.
2173 # see issue #8888 where we bypass using this function in the case
2174 # that we want to detect an unresolvable Mapped[] type.
2175 return False
2176 else:
2177 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2178
2179
2180class _CleanupError(Exception):
2181 pass
2182
2183
2184def _cleanup_mapped_str_annotation(
2185 annotation: str, originating_module: str
2186) -> str:
2187 # fix up an annotation that comes in as the form:
2188 # 'Mapped[List[Address]]' so that it instead looks like:
2189 # 'Mapped[List["Address"]]' , which will allow us to get
2190 # "Address" as a string
2191
2192 # additionally, resolve symbols for these names since this is where
2193 # we'd have to do it
2194
2195 inner: Optional[Match[str]]
2196
2197 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2198
2199 if not mm:
2200 return annotation
2201
2202 # ticket #8759. Resolve the Mapped name to a real symbol.
2203 # originally this just checked the name.
2204 try:
2205 obj = eval_name_only(mm.group(1), originating_module)
2206 except NameError as ne:
2207 raise _CleanupError(
2208 f'For annotation "{annotation}", could not resolve '
2209 f'container type "{mm.group(1)}". '
2210 "Please ensure this type is imported at the module level "
2211 "outside of TYPE_CHECKING blocks"
2212 ) from ne
2213
2214 if obj is typing.ClassVar:
2215 real_symbol = "ClassVar"
2216 else:
2217 try:
2218 if issubclass(obj, _MappedAnnotationBase):
2219 real_symbol = obj.__name__
2220 else:
2221 return annotation
2222 except TypeError:
2223 # avoid isinstance(obj, type) check, just catch TypeError
2224 return annotation
2225
2226 # note: if one of the codepaths above didn't define real_symbol and
2227 # then didn't return, real_symbol raises UnboundLocalError
2228 # which is actually a NameError, and the calling routines don't
2229 # notice this since they are catching NameError anyway. Just in case
2230 # this is being modified in the future, something to be aware of.
2231
2232 stack = []
2233 inner = mm
2234 while True:
2235 stack.append(real_symbol if mm is inner else inner.group(1))
2236 g2 = inner.group(2)
2237 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2238 if inner is None:
2239 stack.append(g2)
2240 break
2241
2242 # stacks we want to rewrite, that is, quote the last entry which
2243 # we think is a relationship class name:
2244 #
2245 # ['Mapped', 'List', 'Address']
2246 # ['Mapped', 'A']
2247 #
2248 # stacks we dont want to rewrite, which are generally MappedColumn
2249 # use cases:
2250 #
2251 # ['Mapped', "'Optional[Dict[str, str]]'"]
2252 # ['Mapped', 'dict[str, str] | None']
2253
2254 if (
2255 # avoid already quoted symbols such as
2256 # ['Mapped', "'Optional[Dict[str, str]]'"]
2257 not re.match(r"""^["'].*["']$""", stack[-1])
2258 # avoid further generics like Dict[] such as
2259 # ['Mapped', 'dict[str, str] | None'],
2260 # ['Mapped', 'list[int] | list[str]'],
2261 # ['Mapped', 'Union[list[int], list[str]]'],
2262 and not re.search(r"[\[\]]", stack[-1])
2263 ):
2264 stripchars = "\"' "
2265 stack[-1] = ", ".join(
2266 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2267 )
2268
2269 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2270
2271 return annotation
2272
2273
2274def _extract_mapped_subtype(
2275 raw_annotation: Optional[_AnnotationScanType],
2276 cls: type,
2277 originating_module: str,
2278 key: str,
2279 attr_cls: Type[Any],
2280 required: bool,
2281 is_dataclass_field: bool,
2282 expect_mapped: bool = True,
2283 raiseerr: bool = True,
2284) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2285 """given an annotation, figure out if it's ``Mapped[something]`` and if
2286 so, return the ``something`` part.
2287
2288 Includes error raise scenarios and other options.
2289
2290 """
2291
2292 if raw_annotation is None:
2293 if required:
2294 raise orm_exc.MappedAnnotationError(
2295 f"Python typing annotation is required for attribute "
2296 f'"{cls.__name__}.{key}" when primary argument(s) for '
2297 f'"{attr_cls.__name__}" construct are None or not present'
2298 )
2299 return None
2300
2301 try:
2302 # destringify the "outside" of the annotation. note we are not
2303 # adding include_generic so it will *not* dig into generic contents,
2304 # which will remain as ForwardRef or plain str under future annotations
2305 # mode. The full destringify happens later when mapped_column goes
2306 # to do a full lookup in the registry type_annotations_map.
2307 annotated = de_stringify_annotation(
2308 cls,
2309 raw_annotation,
2310 originating_module,
2311 str_cleanup_fn=_cleanup_mapped_str_annotation,
2312 )
2313 except _CleanupError as ce:
2314 raise orm_exc.MappedAnnotationError(
2315 f"Could not interpret annotation {raw_annotation}. "
2316 "Check that it uses names that are correctly imported at the "
2317 "module level. See chained stack trace for more hints."
2318 ) from ce
2319 except NameError as ne:
2320 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2321 raise orm_exc.MappedAnnotationError(
2322 f"Could not interpret annotation {raw_annotation}. "
2323 "Check that it uses names that are correctly imported at the "
2324 "module level. See chained stack trace for more hints."
2325 ) from ne
2326
2327 annotated = raw_annotation # type: ignore
2328
2329 if is_dataclass_field:
2330 return annotated, None
2331 else:
2332 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2333 annotated, _MappedAnnotationBase
2334 ):
2335 if expect_mapped:
2336 if not raiseerr:
2337 return None
2338
2339 origin = getattr(annotated, "__origin__", None)
2340 if origin is typing.ClassVar:
2341 return None
2342
2343 # check for other kind of ORM descriptor like AssociationProxy,
2344 # don't raise for that (issue #9957)
2345 elif isinstance(origin, type) and issubclass(
2346 origin, ORMDescriptor
2347 ):
2348 return None
2349
2350 raise orm_exc.MappedAnnotationError(
2351 f'Type annotation for "{cls.__name__}.{key}" '
2352 "can't be correctly interpreted for "
2353 "Annotated Declarative Table form. ORM annotations "
2354 "should normally make use of the ``Mapped[]`` generic "
2355 "type, or other ORM-compatible generic type, as a "
2356 "container for the actual type, which indicates the "
2357 "intent that the attribute is mapped. "
2358 "Class variables that are not intended to be mapped "
2359 "by the ORM should use ClassVar[]. "
2360 "To allow Annotated Declarative to disregard legacy "
2361 "annotations which don't use Mapped[] to pass, set "
2362 '"__allow_unmapped__ = True" on the class or a '
2363 "superclass this class.",
2364 code="zlpr",
2365 )
2366
2367 else:
2368 return annotated, None
2369
2370 generic_annotated = cast(GenericProtocol[Any], annotated)
2371 if len(generic_annotated.__args__) != 1:
2372 raise orm_exc.MappedAnnotationError(
2373 "Expected sub-type for Mapped[] annotation"
2374 )
2375
2376 return (
2377 # fix dict/list/set args to be ForwardRef, see #11814
2378 fixup_container_fwd_refs(generic_annotated.__args__[0]),
2379 generic_annotated.__origin__,
2380 )
2381
2382
2383def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2384 if hasattr(prop, "_mapper_property_name"):
2385 name = prop._mapper_property_name()
2386 else:
2387 name = None
2388 return util.clsname_as_plain_name(prop, name)