1# orm/util.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import get_origin
24from typing import Iterable
25from typing import Iterator
26from typing import List
27from typing import Literal
28from typing import Match
29from typing import Optional
30from typing import Protocol
31from typing import Sequence
32from typing import Tuple
33from typing import Type
34from typing import TYPE_CHECKING
35from typing import TypeVar
36from typing import Union
37import weakref
38
39from . import attributes # noqa
40from . import exc
41from . import exc as orm_exc
42from ._typing import _O
43from ._typing import insp_is_aliased_class
44from ._typing import insp_is_mapper
45from ._typing import prop_is_relationship
46from .base import _class_to_mapper as _class_to_mapper
47from .base import _MappedAnnotationBase
48from .base import _never_set as _never_set # noqa: F401
49from .base import _none_only_set as _none_only_set # noqa: F401
50from .base import _none_set as _none_set # noqa: F401
51from .base import attribute_str as attribute_str # noqa: F401
52from .base import class_mapper as class_mapper
53from .base import DynamicMapped
54from .base import InspectionAttr as InspectionAttr
55from .base import instance_str as instance_str # noqa: F401
56from .base import Mapped
57from .base import object_mapper as object_mapper
58from .base import object_state as object_state # noqa: F401
59from .base import opt_manager_of_class
60from .base import ORMDescriptor
61from .base import state_attribute_str as state_attribute_str # noqa: F401
62from .base import state_class_str as state_class_str # noqa: F401
63from .base import state_str as state_str # noqa: F401
64from .base import WriteOnlyMapped
65from .interfaces import CriteriaOption
66from .interfaces import MapperProperty as MapperProperty
67from .interfaces import ORMColumnsClauseRole
68from .interfaces import ORMEntityColumnsClauseRole
69from .interfaces import ORMFromClauseRole
70from .path_registry import PathRegistry as PathRegistry
71from .. import event
72from .. import exc as sa_exc
73from .. import inspection
74from .. import sql
75from .. import util
76from ..engine.result import result_tuple
77from ..sql import coercions
78from ..sql import expression
79from ..sql import lambdas
80from ..sql import roles
81from ..sql import util as sql_util
82from ..sql import visitors
83from ..sql._typing import is_selectable
84from ..sql.annotation import SupportsCloneAnnotations
85from ..sql.base import ColumnCollection
86from ..sql.cache_key import HasCacheKey
87from ..sql.cache_key import MemoizedHasCacheKey
88from ..sql.elements import ColumnElement
89from ..sql.elements import KeyedColumnElement
90from ..sql.selectable import FromClause
91from ..util.langhelpers import MemoizedSlots
92from ..util.typing import de_stringify_annotation as _de_stringify_annotation
93from ..util.typing import eval_name_only as _eval_name_only
94from ..util.typing import fixup_container_fwd_refs
95from ..util.typing import is_origin_of_cls
96from ..util.typing import TupleAny
97from ..util.typing import Unpack
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InternalEntityType
103 from ._typing import _ORMCOLEXPR
104 from .context import _MapperEntity
105 from .context import _ORMCompileState
106 from .mapper import Mapper
107 from .path_registry import _AbstractEntityRegistry
108 from .query import Query
109 from .relationships import RelationshipProperty
110 from ..engine import Row
111 from ..engine import RowMapping
112 from ..sql._typing import _CE
113 from ..sql._typing import _ColumnExpressionArgument
114 from ..sql._typing import _EquivalentColumnMap
115 from ..sql._typing import _FromClauseArgument
116 from ..sql._typing import _OnClauseArgument
117 from ..sql._typing import _PropagateAttrsType
118 from ..sql.annotation import _SA
119 from ..sql.base import ReadOnlyColumnCollection
120 from ..sql.elements import BindParameter
121 from ..sql.selectable import _ColumnsClauseElement
122 from ..sql.selectable import Select
123 from ..sql.selectable import Selectable
124 from ..sql.visitors import anon_map
125 from ..util.typing import _AnnotationScanType
126
127_T = TypeVar("_T", bound=Any)
128
129all_cascades = frozenset(
130 (
131 "delete",
132 "delete-orphan",
133 "all",
134 "merge",
135 "expunge",
136 "save-update",
137 "refresh-expire",
138 "none",
139 )
140)
141
142_de_stringify_partial = functools.partial(
143 functools.partial,
144 locals_=util.immutabledict(
145 {
146 "Mapped": Mapped,
147 "WriteOnlyMapped": WriteOnlyMapped,
148 "DynamicMapped": DynamicMapped,
149 }
150 ),
151)
152
153# partial is practically useless as we have to write out the whole
154# function and maintain the signature anyway
155
156
157class _DeStringifyAnnotation(Protocol):
158 def __call__(
159 self,
160 cls: Type[Any],
161 annotation: _AnnotationScanType,
162 originating_module: str,
163 *,
164 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
165 include_generic: bool = False,
166 ) -> Type[Any]: ...
167
168
169de_stringify_annotation = cast(
170 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
171)
172
173
174class _EvalNameOnly(Protocol):
175 def __call__(self, name: str, module_name: str) -> Any: ...
176
177
178eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
179
180
181class CascadeOptions(FrozenSet[str]):
182 """Keeps track of the options sent to
183 :paramref:`.relationship.cascade`"""
184
185 _add_w_all_cascades = all_cascades.difference(
186 ["all", "none", "delete-orphan"]
187 )
188 _allowed_cascades = all_cascades
189
190 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
191
192 __slots__ = (
193 "save_update",
194 "delete",
195 "refresh_expire",
196 "merge",
197 "expunge",
198 "delete_orphan",
199 )
200
201 save_update: bool
202 delete: bool
203 refresh_expire: bool
204 merge: bool
205 expunge: bool
206 delete_orphan: bool
207
208 def __new__(
209 cls, value_list: Optional[Union[Iterable[str], str]]
210 ) -> CascadeOptions:
211 if isinstance(value_list, str) or value_list is None:
212 return cls.from_string(value_list) # type: ignore
213 values = set(value_list)
214 if values.difference(cls._allowed_cascades):
215 raise sa_exc.ArgumentError(
216 "Invalid cascade option(s): %s"
217 % ", ".join(
218 [
219 repr(x)
220 for x in sorted(
221 values.difference(cls._allowed_cascades)
222 )
223 ]
224 )
225 )
226
227 if "all" in values:
228 values.update(cls._add_w_all_cascades)
229 if "none" in values:
230 values.clear()
231 values.discard("all")
232
233 self = super().__new__(cls, values)
234 self.save_update = "save-update" in values
235 self.delete = "delete" in values
236 self.refresh_expire = "refresh-expire" in values
237 self.merge = "merge" in values
238 self.expunge = "expunge" in values
239 self.delete_orphan = "delete-orphan" in values
240
241 if self.delete_orphan and not self.delete:
242 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
243 return self
244
245 def __repr__(self):
246 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
247
248 @classmethod
249 def from_string(cls, arg):
250 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
251 return cls(values)
252
253
254def _validator_events(desc, key, validator, include_removes, include_backrefs):
255 """Runs a validation method on an attribute value to be set or
256 appended.
257 """
258
259 if not include_backrefs:
260
261 def detect_is_backref(state, initiator):
262 impl = state.manager[key].impl
263 return initiator.impl is not impl
264
265 if include_removes:
266
267 def append(state, value, initiator):
268 if initiator.op is not attributes.OP_BULK_REPLACE and (
269 include_backrefs or not detect_is_backref(state, initiator)
270 ):
271 return validator(state.obj(), key, value, False)
272 else:
273 return value
274
275 def bulk_set(state, values, initiator):
276 if include_backrefs or not detect_is_backref(state, initiator):
277 obj = state.obj()
278 values[:] = [
279 validator(obj, key, value, False) for value in values
280 ]
281
282 def set_(state, value, oldvalue, initiator):
283 if include_backrefs or not detect_is_backref(state, initiator):
284 return validator(state.obj(), key, value, False)
285 else:
286 return value
287
288 def remove(state, value, initiator):
289 if include_backrefs or not detect_is_backref(state, initiator):
290 validator(state.obj(), key, value, True)
291
292 else:
293
294 def append(state, value, initiator):
295 if initiator.op is not attributes.OP_BULK_REPLACE and (
296 include_backrefs or not detect_is_backref(state, initiator)
297 ):
298 return validator(state.obj(), key, value)
299 else:
300 return value
301
302 def bulk_set(state, values, initiator):
303 if include_backrefs or not detect_is_backref(state, initiator):
304 obj = state.obj()
305 values[:] = [validator(obj, key, value) for value in values]
306
307 def set_(state, value, oldvalue, initiator):
308 if include_backrefs or not detect_is_backref(state, initiator):
309 return validator(state.obj(), key, value)
310 else:
311 return value
312
313 event.listen(desc, "append", append, raw=True, retval=True)
314 event.listen(desc, "bulk_replace", bulk_set, raw=True)
315 event.listen(desc, "set", set_, raw=True, retval=True)
316 if include_removes:
317 event.listen(desc, "remove", remove, raw=True, retval=True)
318
319
320def polymorphic_union(
321 table_map, typecolname, aliasname="p_union", cast_nulls=True
322):
323 """Create a ``UNION`` statement used by a polymorphic mapper.
324
325 See :ref:`concrete_inheritance` for an example of how
326 this is used.
327
328 :param table_map: mapping of polymorphic identities to
329 :class:`_schema.Table` objects.
330 :param typecolname: string name of a "discriminator" column, which will be
331 derived from the query, producing the polymorphic identity for
332 each row. If ``None``, no polymorphic discriminator is generated.
333 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
334 construct generated.
335 :param cast_nulls: if True, non-existent columns, which are represented
336 as labeled NULLs, will be passed into CAST. This is a legacy behavior
337 that is problematic on some backends such as Oracle - in which case it
338 can be set to False.
339
340 """
341
342 colnames: util.OrderedSet[str] = util.OrderedSet()
343 colnamemaps = {}
344 types = {}
345 for key in table_map:
346 table = table_map[key]
347
348 table = coercions.expect(roles.FromClauseRole, table)
349 table_map[key] = table
350
351 m = {}
352 for c in table.c:
353 if c.key == typecolname:
354 raise sa_exc.InvalidRequestError(
355 "Polymorphic union can't use '%s' as the discriminator "
356 "column due to mapped column %r; please apply the "
357 "'typecolname' "
358 "argument; this is available on "
359 "ConcreteBase as '_concrete_discriminator_name'"
360 % (typecolname, c)
361 )
362 colnames.add(c.key)
363 m[c.key] = c
364 types[c.key] = c.type
365 colnamemaps[table] = m
366
367 def col(name, table):
368 try:
369 return colnamemaps[table][name]
370 except KeyError:
371 if cast_nulls:
372 return sql.cast(sql.null(), types[name]).label(name)
373 else:
374 return sql.type_coerce(sql.null(), types[name]).label(name)
375
376 result = []
377 for type_, table in table_map.items():
378 if typecolname is not None:
379 result.append(
380 sql.select(
381 *(
382 [col(name, table) for name in colnames]
383 + [
384 sql.literal_column(
385 sql_util._quote_ddl_expr(type_)
386 ).label(typecolname)
387 ]
388 )
389 ).select_from(table)
390 )
391 else:
392 result.append(
393 sql.select(
394 *[col(name, table) for name in colnames]
395 ).select_from(table)
396 )
397 return sql.union_all(*result).alias(aliasname)
398
399
400def identity_key(
401 class_: Optional[Type[_T]] = None,
402 ident: Union[Any, Tuple[Any, ...]] = None,
403 *,
404 instance: Optional[_T] = None,
405 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
406 identity_token: Optional[Any] = None,
407) -> _IdentityKeyType[_T]:
408 r"""Generate "identity key" tuples, as are used as keys in the
409 :attr:`.Session.identity_map` dictionary.
410
411 This function has several call styles:
412
413 * ``identity_key(class, ident, identity_token=token)``
414
415 This form receives a mapped class and a primary key scalar or
416 tuple as an argument.
417
418 E.g.::
419
420 >>> identity_key(MyClass, (1, 2))
421 (<class '__main__.MyClass'>, (1, 2), None)
422
423 :param class: mapped class (must be a positional argument)
424 :param ident: primary key, may be a scalar or tuple argument.
425 :param identity_token: optional identity token
426
427 * ``identity_key(instance=instance)``
428
429 This form will produce the identity key for a given instance. The
430 instance need not be persistent, only that its primary key attributes
431 are populated (else the key will contain ``None`` for those missing
432 values).
433
434 E.g.::
435
436 >>> instance = MyClass(1, 2)
437 >>> identity_key(instance=instance)
438 (<class '__main__.MyClass'>, (1, 2), None)
439
440 In this form, the given instance is ultimately run though
441 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
442 effect of performing a database check for the corresponding row
443 if the object is expired.
444
445 :param instance: object instance (must be given as a keyword arg)
446
447 * ``identity_key(class, row=row, identity_token=token)``
448
449 This form is similar to the class/tuple form, except is passed a
450 database result row as a :class:`.Row` or :class:`.RowMapping` object.
451
452 E.g.::
453
454 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
455 >>> identity_key(MyClass, row=row)
456 (<class '__main__.MyClass'>, (1, 2), None)
457
458 :param class: mapped class (must be a positional argument)
459 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
460 (must be given as a keyword arg)
461 :param identity_token: optional identity token
462
463 """ # noqa: E501
464 if class_ is not None:
465 mapper = class_mapper(class_)
466 if row is None:
467 if ident is None:
468 raise sa_exc.ArgumentError("ident or row is required")
469 return mapper.identity_key_from_primary_key(
470 tuple(util.to_list(ident)), identity_token=identity_token
471 )
472 else:
473 return mapper.identity_key_from_row(
474 row, identity_token=identity_token
475 )
476 elif instance is not None:
477 mapper = object_mapper(instance)
478 return mapper.identity_key_from_instance(instance)
479 else:
480 raise sa_exc.ArgumentError("class or instance is required")
481
482
483class _TraceAdaptRole(enum.Enum):
484 """Enumeration of all the use cases for ORMAdapter.
485
486 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
487 used for in-place adaption of column expressions to be applied to a SELECT,
488 replacing :class:`.Table` and other objects that are mapped to classes with
489 aliases of those tables in the case of joined eager loading, or in the case
490 of polymorphic loading as used with concrete mappings or other custom "with
491 polymorphic" parameters, with whole user-defined subqueries. The
492 enumerations provide an overview of all the use cases used by ORMAdapter, a
493 layer of formality as to the introduction of new ORMAdapter use cases (of
494 which none are anticipated), as well as a means to trace the origins of a
495 particular ORMAdapter within runtime debugging.
496
497 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
498 open-ended statement adaption, including the ``Query.with_polymorphic()``
499 method and the ``Query.select_from_entity()`` methods, favoring
500 user-explicit aliasing schemes using the ``aliased()`` and
501 ``with_polymorphic()`` standalone constructs; these still use adaption,
502 however the adaption is applied in a narrower scope.
503
504 """
505
506 # aliased() use that is used to adapt individual attributes at query
507 # construction time
508 ALIASED_INSP = enum.auto()
509
510 # joinedload cases; typically adapt an ON clause of a relationship
511 # join
512 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
513 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
514 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
515
516 # polymorphic cases - these are complex ones that replace FROM
517 # clauses, replacing tables with subqueries
518 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
519 WITH_POLYMORPHIC_ADAPTER = enum.auto()
520 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
521 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
522
523 # the from_statement() case, used only to adapt individual attributes
524 # from a given statement to local ORM attributes at result fetching
525 # time. assigned to ORMCompileState._from_obj_alias
526 ADAPT_FROM_STATEMENT = enum.auto()
527
528 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
529 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
530 # joinedloads are then placed on the outside.
531 # assigned to ORMCompileState.compound_eager_adapter
532 COMPOUND_EAGER_STATEMENT = enum.auto()
533
534 # the legacy Query._set_select_from() case.
535 # this is needed for Query's set operations (i.e. UNION, etc. )
536 # as well as "legacy from_self()", which while removed from 2.0 as
537 # public API, is used for the Query.count() method. this one
538 # still does full statement traversal
539 # assigned to ORMCompileState._from_obj_alias
540 LEGACY_SELECT_FROM_ALIAS = enum.auto()
541
542
543class ORMStatementAdapter(sql_util.ColumnAdapter):
544 """ColumnAdapter which includes a role attribute."""
545
546 __slots__ = ("role",)
547
548 def __init__(
549 self,
550 role: _TraceAdaptRole,
551 selectable: Selectable,
552 *,
553 equivalents: Optional[_EquivalentColumnMap] = None,
554 adapt_required: bool = False,
555 allow_label_resolve: bool = True,
556 anonymize_labels: bool = False,
557 adapt_on_names: bool = False,
558 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
559 ):
560 self.role = role
561 super().__init__(
562 selectable,
563 equivalents=equivalents,
564 adapt_required=adapt_required,
565 allow_label_resolve=allow_label_resolve,
566 anonymize_labels=anonymize_labels,
567 adapt_on_names=adapt_on_names,
568 adapt_from_selectables=adapt_from_selectables,
569 )
570
571
572class ORMAdapter(sql_util.ColumnAdapter):
573 """ColumnAdapter subclass which excludes adaptation of entities from
574 non-matching mappers.
575
576 """
577
578 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
579
580 is_aliased_class: bool
581 aliased_insp: Optional[AliasedInsp[Any]]
582
583 def __init__(
584 self,
585 role: _TraceAdaptRole,
586 entity: _InternalEntityType[Any],
587 *,
588 equivalents: Optional[_EquivalentColumnMap] = None,
589 adapt_required: bool = False,
590 allow_label_resolve: bool = True,
591 anonymize_labels: bool = False,
592 selectable: Optional[Selectable] = None,
593 limit_on_entity: bool = True,
594 adapt_on_names: bool = False,
595 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
596 ):
597 self.role = role
598 self.mapper = entity.mapper
599 if selectable is None:
600 selectable = entity.selectable
601 if insp_is_aliased_class(entity):
602 self.is_aliased_class = True
603 self.aliased_insp = entity
604 else:
605 self.is_aliased_class = False
606 self.aliased_insp = None
607
608 super().__init__(
609 selectable,
610 equivalents,
611 adapt_required=adapt_required,
612 allow_label_resolve=allow_label_resolve,
613 anonymize_labels=anonymize_labels,
614 include_fn=self._include_fn if limit_on_entity else None,
615 adapt_on_names=adapt_on_names,
616 adapt_from_selectables=adapt_from_selectables,
617 )
618
619 def _include_fn(self, elem):
620 entity = elem._annotations.get("parentmapper", None)
621
622 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
623
624
625class AliasedClass(
626 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
627):
628 r"""Represents an "aliased" form of a mapped class for usage with Query.
629
630 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
631 construct, this object mimics the mapped class using a
632 ``__getattr__`` scheme and maintains a reference to a
633 real :class:`~sqlalchemy.sql.expression.Alias` object.
634
635 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
636 within a SQL statement generated by the ORM, such that an existing
637 mapped entity can be used in multiple contexts. A simple example::
638
639 # find all pairs of users with the same name
640 user_alias = aliased(User)
641 session.query(User, user_alias).join(
642 (user_alias, User.id > user_alias.id)
643 ).filter(User.name == user_alias.name)
644
645 :class:`.AliasedClass` is also capable of mapping an existing mapped
646 class to an entirely new selectable, provided this selectable is column-
647 compatible with the existing mapped selectable, and it can also be
648 configured in a mapping as the target of a :func:`_orm.relationship`.
649 See the links below for examples.
650
651 The :class:`.AliasedClass` object is constructed typically using the
652 :func:`_orm.aliased` function. It also is produced with additional
653 configuration when using the :func:`_orm.with_polymorphic` function.
654
655 The resulting object is an instance of :class:`.AliasedClass`.
656 This object implements an attribute scheme which produces the
657 same attribute and method interface as the original mapped
658 class, allowing :class:`.AliasedClass` to be compatible
659 with any attribute technique which works on the original class,
660 including hybrid attributes (see :ref:`hybrids_toplevel`).
661
662 The :class:`.AliasedClass` can be inspected for its underlying
663 :class:`_orm.Mapper`, aliased selectable, and other information
664 using :func:`_sa.inspect`::
665
666 from sqlalchemy import inspect
667
668 my_alias = aliased(MyClass)
669 insp = inspect(my_alias)
670
671 The resulting inspection object is an instance of :class:`.AliasedInsp`.
672
673
674 .. seealso::
675
676 :func:`.aliased`
677
678 :func:`.with_polymorphic`
679
680 :ref:`relationship_aliased_class`
681
682 :ref:`relationship_to_window_function`
683
684
685 """
686
687 __name__: str
688
689 def __init__(
690 self,
691 mapped_class_or_ac: _EntityType[_O],
692 alias: Optional[FromClause] = None,
693 name: Optional[str] = None,
694 flat: bool = False,
695 adapt_on_names: bool = False,
696 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
697 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
698 base_alias: Optional[AliasedInsp[Any]] = None,
699 use_mapper_path: bool = False,
700 represents_outer_join: bool = False,
701 ):
702 insp = cast(
703 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
704 )
705 mapper = insp.mapper
706
707 nest_adapters = False
708
709 if alias is None:
710 if insp.is_aliased_class and insp.selectable._is_subquery:
711 alias = insp.selectable.alias()
712 else:
713 alias = (
714 mapper._with_polymorphic_selectable._anonymous_fromclause(
715 name=name,
716 flat=flat,
717 )
718 )
719 elif insp.is_aliased_class:
720 nest_adapters = True
721
722 assert alias is not None
723 self._aliased_insp = AliasedInsp(
724 self,
725 insp,
726 alias,
727 name,
728 (
729 with_polymorphic_mappers
730 if with_polymorphic_mappers
731 else mapper.with_polymorphic_mappers
732 ),
733 (
734 with_polymorphic_discriminator
735 if with_polymorphic_discriminator is not None
736 else mapper.polymorphic_on
737 ),
738 base_alias,
739 use_mapper_path,
740 adapt_on_names,
741 represents_outer_join,
742 nest_adapters,
743 )
744
745 self.__name__ = f"aliased({mapper.class_.__name__})"
746
747 @classmethod
748 def _reconstitute_from_aliased_insp(
749 cls, aliased_insp: AliasedInsp[_O]
750 ) -> AliasedClass[_O]:
751 obj = cls.__new__(cls)
752 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
753 obj._aliased_insp = aliased_insp
754
755 if aliased_insp._is_with_polymorphic:
756 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
757 if sub_aliased_insp is not aliased_insp:
758 ent = AliasedClass._reconstitute_from_aliased_insp(
759 sub_aliased_insp
760 )
761 setattr(obj, sub_aliased_insp.class_.__name__, ent)
762
763 return obj
764
765 def __getattr__(self, key: str) -> Any:
766 try:
767 _aliased_insp = self.__dict__["_aliased_insp"]
768 except KeyError:
769 raise AttributeError()
770 else:
771 target = _aliased_insp._target
772 # maintain all getattr mechanics
773 attr = getattr(target, key)
774
775 # attribute is a method, that will be invoked against a
776 # "self"; so just return a new method with the same function and
777 # new self
778 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
779 return types.MethodType(attr.__func__, self)
780
781 # attribute is a descriptor, that will be invoked against a
782 # "self"; so invoke the descriptor against this self
783 if hasattr(attr, "__get__"):
784 attr = attr.__get__(None, self)
785
786 # attributes within the QueryableAttribute system will want this
787 # to be invoked so the object can be adapted
788 if hasattr(attr, "adapt_to_entity"):
789 attr = attr.adapt_to_entity(_aliased_insp)
790 setattr(self, key, attr)
791
792 return attr
793
794 def _get_from_serialized(
795 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
796 ) -> Any:
797 # this method is only used in terms of the
798 # sqlalchemy.ext.serializer extension
799 attr = getattr(mapped_class, key)
800 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
801 return types.MethodType(attr.__func__, self)
802
803 # attribute is a descriptor, that will be invoked against a
804 # "self"; so invoke the descriptor against this self
805 if hasattr(attr, "__get__"):
806 attr = attr.__get__(None, self)
807
808 # attributes within the QueryableAttribute system will want this
809 # to be invoked so the object can be adapted
810 if hasattr(attr, "adapt_to_entity"):
811 aliased_insp._weak_entity = weakref.ref(self)
812 attr = attr.adapt_to_entity(aliased_insp)
813 setattr(self, key, attr)
814
815 return attr
816
817 def __repr__(self) -> str:
818 return "<AliasedClass at 0x%x; %s>" % (
819 id(self),
820 self._aliased_insp._target.__name__,
821 )
822
823 def __str__(self) -> str:
824 return str(self._aliased_insp)
825
826
827@inspection._self_inspects
828class AliasedInsp(
829 ORMEntityColumnsClauseRole[_O],
830 ORMFromClauseRole,
831 HasCacheKey,
832 InspectionAttr,
833 MemoizedSlots,
834 inspection.Inspectable["AliasedInsp[_O]"],
835 Generic[_O],
836):
837 """Provide an inspection interface for an
838 :class:`.AliasedClass` object.
839
840 The :class:`.AliasedInsp` object is returned
841 given an :class:`.AliasedClass` using the
842 :func:`_sa.inspect` function::
843
844 from sqlalchemy import inspect
845 from sqlalchemy.orm import aliased
846
847 my_alias = aliased(MyMappedClass)
848 insp = inspect(my_alias)
849
850 Attributes on :class:`.AliasedInsp`
851 include:
852
853 * ``entity`` - the :class:`.AliasedClass` represented.
854 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
855 * ``selectable`` - the :class:`_expression.Alias`
856 construct which ultimately
857 represents an aliased :class:`_schema.Table` or
858 :class:`_expression.Select`
859 construct.
860 * ``name`` - the name of the alias. Also is used as the attribute
861 name when returned in a result tuple from :class:`_query.Query`.
862 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
863 objects
864 indicating all those mappers expressed in the select construct
865 for the :class:`.AliasedClass`.
866 * ``polymorphic_on`` - an alternate column or SQL expression which
867 will be used as the "discriminator" for a polymorphic load.
868
869 .. seealso::
870
871 :ref:`inspection_toplevel`
872
873 """
874
875 __slots__ = (
876 "__weakref__",
877 "_weak_entity",
878 "mapper",
879 "selectable",
880 "name",
881 "_adapt_on_names",
882 "with_polymorphic_mappers",
883 "polymorphic_on",
884 "_use_mapper_path",
885 "_base_alias",
886 "represents_outer_join",
887 "persist_selectable",
888 "local_table",
889 "_is_with_polymorphic",
890 "_with_polymorphic_entities",
891 "_adapter",
892 "_target",
893 "__clause_element__",
894 "_memoized_values",
895 "_all_column_expressions",
896 "_nest_adapters",
897 )
898
899 _cache_key_traversal = [
900 ("name", visitors.ExtendedInternalTraversal.dp_string),
901 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
902 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
903 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
904 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
905 (
906 "with_polymorphic_mappers",
907 visitors.InternalTraversal.dp_has_cache_key_list,
908 ),
909 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
910 ]
911
912 mapper: Mapper[_O]
913 selectable: FromClause
914 _adapter: ORMAdapter
915 with_polymorphic_mappers: Sequence[Mapper[Any]]
916 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
917
918 _weak_entity: weakref.ref[AliasedClass[_O]]
919 """the AliasedClass that refers to this AliasedInsp"""
920
921 _target: Union[Type[_O], AliasedClass[_O]]
922 """the thing referenced by the AliasedClass/AliasedInsp.
923
924 In the vast majority of cases, this is the mapped class. However
925 it may also be another AliasedClass (alias of alias).
926
927 """
928
929 def __init__(
930 self,
931 entity: AliasedClass[_O],
932 inspected: _InternalEntityType[_O],
933 selectable: FromClause,
934 name: Optional[str],
935 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
936 polymorphic_on: Optional[ColumnElement[Any]],
937 _base_alias: Optional[AliasedInsp[Any]],
938 _use_mapper_path: bool,
939 adapt_on_names: bool,
940 represents_outer_join: bool,
941 nest_adapters: bool,
942 ):
943 mapped_class_or_ac = inspected.entity
944 mapper = inspected.mapper
945
946 self._weak_entity = weakref.ref(entity)
947 self.mapper = mapper
948 self.selectable = self.persist_selectable = self.local_table = (
949 selectable
950 )
951 self.name = name
952 self.polymorphic_on = polymorphic_on
953 self._base_alias = weakref.ref(_base_alias or self)
954 self._use_mapper_path = _use_mapper_path
955 self.represents_outer_join = represents_outer_join
956 self._nest_adapters = nest_adapters
957
958 if with_polymorphic_mappers:
959 self._is_with_polymorphic = True
960 self.with_polymorphic_mappers = with_polymorphic_mappers
961 self._with_polymorphic_entities = []
962 for poly in self.with_polymorphic_mappers:
963 if poly is not mapper:
964 ent = AliasedClass(
965 poly.class_,
966 selectable,
967 base_alias=self,
968 adapt_on_names=adapt_on_names,
969 use_mapper_path=_use_mapper_path,
970 )
971
972 setattr(self.entity, poly.class_.__name__, ent)
973 self._with_polymorphic_entities.append(ent._aliased_insp)
974
975 else:
976 self._is_with_polymorphic = False
977 self.with_polymorphic_mappers = [mapper]
978
979 self._adapter = ORMAdapter(
980 _TraceAdaptRole.ALIASED_INSP,
981 mapper,
982 selectable=selectable,
983 equivalents=mapper._equivalent_columns,
984 adapt_on_names=adapt_on_names,
985 anonymize_labels=True,
986 # make sure the adapter doesn't try to grab other tables that
987 # are not even the thing we are mapping, such as embedded
988 # selectables in subqueries or CTEs. See issue #6060
989 adapt_from_selectables={
990 m.selectable
991 for m in self.with_polymorphic_mappers
992 if not adapt_on_names
993 },
994 limit_on_entity=False,
995 )
996
997 if nest_adapters:
998 # supports "aliased class of aliased class" use case
999 assert isinstance(inspected, AliasedInsp)
1000 self._adapter = inspected._adapter.wrap(self._adapter)
1001
1002 self._adapt_on_names = adapt_on_names
1003 self._target = mapped_class_or_ac
1004
1005 @classmethod
1006 def _alias_factory(
1007 cls,
1008 element: Union[_EntityType[_O], FromClause],
1009 alias: Optional[FromClause] = None,
1010 name: Optional[str] = None,
1011 flat: bool = False,
1012 adapt_on_names: bool = False,
1013 ) -> Union[AliasedClass[_O], FromClause]:
1014 if isinstance(element, FromClause):
1015 if adapt_on_names:
1016 raise sa_exc.ArgumentError(
1017 "adapt_on_names only applies to ORM elements"
1018 )
1019 if name:
1020 return element.alias(name=name, flat=flat)
1021 else:
1022 return coercions.expect(
1023 roles.AnonymizedFromClauseRole, element, flat=flat
1024 )
1025 else:
1026 return AliasedClass(
1027 element,
1028 alias=alias,
1029 flat=flat,
1030 name=name,
1031 adapt_on_names=adapt_on_names,
1032 )
1033
1034 @classmethod
1035 def _with_polymorphic_factory(
1036 cls,
1037 base: Union[Type[_O], Mapper[_O]],
1038 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1039 selectable: Union[Literal[False, None], FromClause] = False,
1040 flat: bool = False,
1041 polymorphic_on: Optional[ColumnElement[Any]] = None,
1042 aliased: bool = False,
1043 innerjoin: bool = False,
1044 adapt_on_names: bool = False,
1045 name: Optional[str] = None,
1046 _use_mapper_path: bool = False,
1047 ) -> AliasedClass[_O]:
1048 primary_mapper = _class_to_mapper(base)
1049
1050 if selectable not in (None, False) and flat:
1051 raise sa_exc.ArgumentError(
1052 "the 'flat' and 'selectable' arguments cannot be passed "
1053 "simultaneously to with_polymorphic()"
1054 )
1055
1056 mappers, selectable = primary_mapper._with_polymorphic_args(
1057 classes, selectable, innerjoin=innerjoin
1058 )
1059 if aliased or flat:
1060 assert selectable is not None
1061 selectable = selectable._anonymous_fromclause(flat=flat)
1062
1063 return AliasedClass(
1064 base,
1065 selectable,
1066 name=name,
1067 with_polymorphic_mappers=mappers,
1068 adapt_on_names=adapt_on_names,
1069 with_polymorphic_discriminator=polymorphic_on,
1070 use_mapper_path=_use_mapper_path,
1071 represents_outer_join=not innerjoin,
1072 )
1073
1074 @property
1075 def entity(self) -> AliasedClass[_O]:
1076 # to eliminate reference cycles, the AliasedClass is held weakly.
1077 # this produces some situations where the AliasedClass gets lost,
1078 # particularly when one is created internally and only the AliasedInsp
1079 # is passed around.
1080 # to work around this case, we just generate a new one when we need
1081 # it, as it is a simple class with very little initial state on it.
1082 ent = self._weak_entity()
1083 if ent is None:
1084 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1085 self._weak_entity = weakref.ref(ent)
1086 return ent
1087
1088 is_aliased_class = True
1089 "always returns True"
1090
1091 def _memoized_method___clause_element__(self) -> FromClause:
1092 return self.selectable._annotate(
1093 {
1094 "parentmapper": self.mapper,
1095 "parententity": self,
1096 "entity_namespace": self,
1097 }
1098 )._set_propagate_attrs(
1099 {"compile_state_plugin": "orm", "plugin_subject": self}
1100 )
1101
1102 @property
1103 def entity_namespace(self) -> AliasedClass[_O]:
1104 return self.entity
1105
1106 @property
1107 def class_(self) -> Type[_O]:
1108 """Return the mapped class ultimately represented by this
1109 :class:`.AliasedInsp`."""
1110 return self.mapper.class_
1111
1112 @property
1113 def _path_registry(self) -> _AbstractEntityRegistry:
1114 if self._use_mapper_path:
1115 return self.mapper._path_registry
1116 else:
1117 return PathRegistry.per_mapper(self)
1118
1119 def __getstate__(self) -> Dict[str, Any]:
1120 return {
1121 "entity": self.entity,
1122 "mapper": self.mapper,
1123 "alias": self.selectable,
1124 "name": self.name,
1125 "adapt_on_names": self._adapt_on_names,
1126 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1127 "with_polymorphic_discriminator": self.polymorphic_on,
1128 "base_alias": self._base_alias(),
1129 "use_mapper_path": self._use_mapper_path,
1130 "represents_outer_join": self.represents_outer_join,
1131 "nest_adapters": self._nest_adapters,
1132 }
1133
1134 def __setstate__(self, state: Dict[str, Any]) -> None:
1135 self.__init__( # type: ignore
1136 state["entity"],
1137 state["mapper"],
1138 state["alias"],
1139 state["name"],
1140 state["with_polymorphic_mappers"],
1141 state["with_polymorphic_discriminator"],
1142 state["base_alias"],
1143 state["use_mapper_path"],
1144 state["adapt_on_names"],
1145 state["represents_outer_join"],
1146 state["nest_adapters"],
1147 )
1148
1149 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1150 # assert self._is_with_polymorphic
1151 # assert other._is_with_polymorphic
1152
1153 primary_mapper = other.mapper
1154
1155 assert self.mapper is primary_mapper
1156
1157 our_classes = util.to_set(
1158 mp.class_ for mp in self.with_polymorphic_mappers
1159 )
1160 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1161 if our_classes == new_classes:
1162 return other
1163 else:
1164 classes = our_classes.union(new_classes)
1165
1166 mappers, selectable = primary_mapper._with_polymorphic_args(
1167 classes, None, innerjoin=not other.represents_outer_join
1168 )
1169 selectable = selectable._anonymous_fromclause(flat=True)
1170 return AliasedClass(
1171 primary_mapper,
1172 selectable,
1173 with_polymorphic_mappers=mappers,
1174 with_polymorphic_discriminator=other.polymorphic_on,
1175 use_mapper_path=other._use_mapper_path,
1176 represents_outer_join=other.represents_outer_join,
1177 )._aliased_insp
1178
1179 def _adapt_element(
1180 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1181 ) -> _ORMCOLEXPR:
1182 assert isinstance(expr, ColumnElement)
1183 d: Dict[str, Any] = {
1184 "parententity": self,
1185 "parentmapper": self.mapper,
1186 }
1187 if key:
1188 d["proxy_key"] = key
1189
1190 # userspace adapt of an attribute from AliasedClass; validate that
1191 # it actually was present
1192 adapted = self._adapter.adapt_check_present(expr)
1193 if adapted is None:
1194 adapted = expr
1195 if self._adapter.adapt_on_names:
1196 util.warn_limited(
1197 "Did not locate an expression in selectable for "
1198 "attribute %r; ensure name is correct in expression",
1199 (key,),
1200 )
1201 else:
1202 util.warn_limited(
1203 "Did not locate an expression in selectable for "
1204 "attribute %r; to match by name, use the "
1205 "adapt_on_names parameter",
1206 (key,),
1207 )
1208
1209 return adapted._annotate(d)._set_propagate_attrs(
1210 {"compile_state_plugin": "orm", "plugin_subject": self}
1211 )
1212
1213 if TYPE_CHECKING:
1214 # establish compatibility with the _ORMAdapterProto protocol,
1215 # which in turn is compatible with _CoreAdapterProto.
1216
1217 def _orm_adapt_element(
1218 self,
1219 obj: _CE,
1220 key: Optional[str] = None,
1221 ) -> _CE: ...
1222
1223 else:
1224 _orm_adapt_element = _adapt_element
1225
1226 def _entity_for_mapper(self, mapper):
1227 self_poly = self.with_polymorphic_mappers
1228 if mapper in self_poly:
1229 if mapper is self.mapper:
1230 return self
1231 else:
1232 return getattr(
1233 self.entity, mapper.class_.__name__
1234 )._aliased_insp
1235 elif mapper.isa(self.mapper):
1236 return self
1237 else:
1238 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1239
1240 def _memoized_attr__get_clause(self):
1241 onclause, replacemap = self.mapper._get_clause
1242 return (
1243 self._adapter.traverse(onclause),
1244 {
1245 self._adapter.traverse(col): param
1246 for col, param in replacemap.items()
1247 },
1248 )
1249
1250 def _memoized_attr__memoized_values(self):
1251 return {}
1252
1253 def _memoized_attr__all_column_expressions(self):
1254 if self._is_with_polymorphic:
1255 cols_plus_keys = self.mapper._columns_plus_keys(
1256 [ent.mapper for ent in self._with_polymorphic_entities]
1257 )
1258 else:
1259 cols_plus_keys = self.mapper._columns_plus_keys()
1260
1261 cols_plus_keys = [
1262 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1263 ]
1264
1265 return ColumnCollection(cols_plus_keys)
1266
1267 def _memo(self, key, callable_, *args, **kw):
1268 if key in self._memoized_values:
1269 return self._memoized_values[key]
1270 else:
1271 self._memoized_values[key] = value = callable_(*args, **kw)
1272 return value
1273
1274 def __repr__(self):
1275 if self.with_polymorphic_mappers:
1276 with_poly = "(%s)" % ", ".join(
1277 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1278 )
1279 else:
1280 with_poly = ""
1281 return "<AliasedInsp at 0x%x; %s%s>" % (
1282 id(self),
1283 self.class_.__name__,
1284 with_poly,
1285 )
1286
1287 def __str__(self):
1288 if self._is_with_polymorphic:
1289 return "with_polymorphic(%s, [%s])" % (
1290 self._target.__name__,
1291 ", ".join(
1292 mp.class_.__name__
1293 for mp in self.with_polymorphic_mappers
1294 if mp is not self.mapper
1295 ),
1296 )
1297 else:
1298 return "aliased(%s)" % (self._target.__name__,)
1299
1300
1301class _WrapUserEntity:
1302 """A wrapper used within the loader_criteria lambda caller so that
1303 we can bypass declared_attr descriptors on unmapped mixins, which
1304 normally emit a warning for such use.
1305
1306 might also be useful for other per-lambda instrumentations should
1307 the need arise.
1308
1309 """
1310
1311 __slots__ = ("subject",)
1312
1313 def __init__(self, subject):
1314 self.subject = subject
1315
1316 @util.preload_module("sqlalchemy.orm.decl_api")
1317 def __getattribute__(self, name):
1318 decl_api = util.preloaded.orm.decl_api
1319
1320 subject = object.__getattribute__(self, "subject")
1321 if name in subject.__dict__ and isinstance(
1322 subject.__dict__[name], decl_api.declared_attr
1323 ):
1324 return subject.__dict__[name].fget(subject)
1325 else:
1326 return getattr(subject, name)
1327
1328
1329class LoaderCriteriaOption(CriteriaOption):
1330 """Add additional WHERE criteria to the load for all occurrences of
1331 a particular entity.
1332
1333 :class:`_orm.LoaderCriteriaOption` is invoked using the
1334 :func:`_orm.with_loader_criteria` function; see that function for
1335 details.
1336
1337 .. versionadded:: 1.4
1338
1339 """
1340
1341 __slots__ = (
1342 "root_entity",
1343 "entity",
1344 "deferred_where_criteria",
1345 "where_criteria",
1346 "_where_crit_orig",
1347 "include_aliases",
1348 "propagate_to_loaders",
1349 )
1350
1351 _traverse_internals = [
1352 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1353 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1354 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1355 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1356 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1357 ]
1358
1359 root_entity: Optional[Type[Any]]
1360 entity: Optional[_InternalEntityType[Any]]
1361 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1362 deferred_where_criteria: bool
1363 include_aliases: bool
1364 propagate_to_loaders: bool
1365
1366 _where_crit_orig: Any
1367
1368 def __init__(
1369 self,
1370 entity_or_base: _EntityType[Any],
1371 where_criteria: Union[
1372 _ColumnExpressionArgument[bool],
1373 Callable[[Any], _ColumnExpressionArgument[bool]],
1374 ],
1375 loader_only: bool = False,
1376 include_aliases: bool = False,
1377 propagate_to_loaders: bool = True,
1378 track_closure_variables: bool = True,
1379 ):
1380 entity = cast(
1381 "_InternalEntityType[Any]",
1382 inspection.inspect(entity_or_base, False),
1383 )
1384 if entity is None:
1385 self.root_entity = cast("Type[Any]", entity_or_base)
1386 self.entity = None
1387 else:
1388 self.root_entity = None
1389 self.entity = entity
1390
1391 self._where_crit_orig = where_criteria
1392 if callable(where_criteria):
1393 if self.root_entity is not None:
1394 wrap_entity = self.root_entity
1395 else:
1396 assert entity is not None
1397 wrap_entity = entity.entity
1398
1399 self.deferred_where_criteria = True
1400 self.where_criteria = lambdas.DeferredLambdaElement(
1401 where_criteria,
1402 roles.WhereHavingRole,
1403 lambda_args=(_WrapUserEntity(wrap_entity),),
1404 opts=lambdas.LambdaOptions(
1405 track_closure_variables=track_closure_variables
1406 ),
1407 )
1408 else:
1409 self.deferred_where_criteria = False
1410 self.where_criteria = coercions.expect(
1411 roles.WhereHavingRole, where_criteria
1412 )
1413
1414 self.include_aliases = include_aliases
1415 self.propagate_to_loaders = propagate_to_loaders
1416
1417 @classmethod
1418 def _unreduce(
1419 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1420 ):
1421 return LoaderCriteriaOption(
1422 entity,
1423 where_criteria,
1424 include_aliases=include_aliases,
1425 propagate_to_loaders=propagate_to_loaders,
1426 )
1427
1428 def __reduce__(self):
1429 return (
1430 LoaderCriteriaOption._unreduce,
1431 (
1432 self.entity.class_ if self.entity else self.root_entity,
1433 self._where_crit_orig,
1434 self.include_aliases,
1435 self.propagate_to_loaders,
1436 ),
1437 )
1438
1439 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1440 if self.entity:
1441 yield from self.entity.mapper.self_and_descendants
1442 else:
1443 assert self.root_entity
1444 stack = list(self.root_entity.__subclasses__())
1445 while stack:
1446 subclass = stack.pop(0)
1447 ent = cast(
1448 "_InternalEntityType[Any]",
1449 inspection.inspect(subclass, raiseerr=False),
1450 )
1451 if ent:
1452 yield from ent.mapper.self_and_descendants
1453 else:
1454 stack.extend(subclass.__subclasses__())
1455
1456 def _should_include(self, compile_state: _ORMCompileState) -> bool:
1457 if (
1458 compile_state.select_statement._annotations.get(
1459 "for_loader_criteria", None
1460 )
1461 is self
1462 ):
1463 return False
1464 return True
1465
1466 def _resolve_where_criteria(
1467 self, ext_info: _InternalEntityType[Any]
1468 ) -> ColumnElement[bool]:
1469 if self.deferred_where_criteria:
1470 crit = cast(
1471 "ColumnElement[bool]",
1472 self.where_criteria._resolve_with_args(ext_info.entity),
1473 )
1474 else:
1475 crit = self.where_criteria # type: ignore
1476 assert isinstance(crit, ColumnElement)
1477 return sql_util._deep_annotate(
1478 crit,
1479 {"for_loader_criteria": self},
1480 detect_subquery_cols=True,
1481 ind_cols_on_fromclause=True,
1482 )
1483
1484 def process_compile_state_replaced_entities(
1485 self,
1486 compile_state: _ORMCompileState,
1487 mapper_entities: Iterable[_MapperEntity],
1488 ) -> None:
1489 self.process_compile_state(compile_state)
1490
1491 def process_compile_state(self, compile_state: _ORMCompileState) -> None:
1492 """Apply a modification to a given :class:`.CompileState`."""
1493
1494 # if options to limit the criteria to immediate query only,
1495 # use compile_state.attributes instead
1496
1497 self.get_global_criteria(compile_state.global_attributes)
1498
1499 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1500 for mp in self._all_mappers():
1501 load_criteria = attributes.setdefault(
1502 ("additional_entity_criteria", mp), []
1503 )
1504
1505 load_criteria.append(self)
1506
1507
1508inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1509
1510
1511@inspection._inspects(type)
1512def _inspect_mc(
1513 class_: Type[_O],
1514) -> Optional[Mapper[_O]]:
1515 try:
1516 class_manager = opt_manager_of_class(class_)
1517 if class_manager is None or not class_manager.is_mapped:
1518 return None
1519 mapper = class_manager.mapper
1520 except exc.NO_STATE:
1521 return None
1522 else:
1523 return mapper
1524
1525
1526GenericAlias = type(List[Any])
1527
1528
1529@inspection._inspects(GenericAlias)
1530def _inspect_generic_alias(
1531 class_: Type[_O],
1532) -> Optional[Mapper[_O]]:
1533 origin = cast("Type[_O]", get_origin(class_))
1534 return _inspect_mc(origin)
1535
1536
1537@inspection._self_inspects
1538class Bundle(
1539 ORMColumnsClauseRole[_T],
1540 SupportsCloneAnnotations,
1541 MemoizedHasCacheKey,
1542 inspection.Inspectable["Bundle[_T]"],
1543 InspectionAttr,
1544):
1545 """A grouping of SQL expressions that are returned by a :class:`.Query`
1546 under one namespace.
1547
1548 The :class:`.Bundle` essentially allows nesting of the tuple-based
1549 results returned by a column-oriented :class:`_query.Query` object.
1550 It also
1551 is extensible via simple subclassing, where the primary capability
1552 to override is that of how the set of expressions should be returned,
1553 allowing post-processing as well as custom return types, without
1554 involving ORM identity-mapped classes.
1555
1556 .. seealso::
1557
1558 :ref:`bundles`
1559
1560
1561 """
1562
1563 single_entity = False
1564 """If True, queries for a single Bundle will be returned as a single
1565 entity, rather than an element within a keyed tuple."""
1566
1567 is_clause_element = False
1568
1569 is_mapper = False
1570
1571 is_aliased_class = False
1572
1573 is_bundle = True
1574
1575 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1576
1577 proxy_set = util.EMPTY_SET
1578
1579 exprs: List[_ColumnsClauseElement]
1580
1581 def __init__(
1582 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1583 ):
1584 r"""Construct a new :class:`.Bundle`.
1585
1586 e.g.::
1587
1588 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1589
1590 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1591 print(row.mybundle.x, row.mybundle.y)
1592
1593 :param name: name of the bundle.
1594 :param \*exprs: columns or SQL expressions comprising the bundle.
1595 :param single_entity=False: if True, rows for this :class:`.Bundle`
1596 can be returned as a "single entity" outside of any enclosing tuple
1597 in the same manner as a mapped entity.
1598
1599 """ # noqa: E501
1600 self.name = self._label = name
1601 coerced_exprs = [
1602 coercions.expect(
1603 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1604 )
1605 for expr in exprs
1606 ]
1607 self.exprs = coerced_exprs
1608
1609 self.c = self.columns = ColumnCollection(
1610 (getattr(col, "key", col._label), col)
1611 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1612 ).as_readonly()
1613 self.single_entity = kw.pop("single_entity", self.single_entity)
1614
1615 def _gen_cache_key(
1616 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1617 ) -> Tuple[Any, ...]:
1618 return (self.__class__, self.name, self.single_entity) + tuple(
1619 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1620 )
1621
1622 @property
1623 def mapper(self) -> Optional[Mapper[Any]]:
1624 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1625 "parentmapper", None
1626 )
1627 return mp
1628
1629 @property
1630 def entity(self) -> Optional[_InternalEntityType[Any]]:
1631 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1632 0
1633 ]._annotations.get("parententity", None)
1634 return ie
1635
1636 @property
1637 def entity_namespace(
1638 self,
1639 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1640 return self.c
1641
1642 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1643
1644 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1645
1646 e.g.::
1647
1648 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1649
1650 q = sess.query(bn).filter(bn.c.x == 5)
1651
1652 Nesting of bundles is also supported::
1653
1654 b1 = Bundle(
1655 "b1",
1656 Bundle("b2", MyClass.a, MyClass.b),
1657 Bundle("b3", MyClass.x, MyClass.y),
1658 )
1659
1660 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1661
1662 .. seealso::
1663
1664 :attr:`.Bundle.c`
1665
1666 """ # noqa: E501
1667
1668 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1669 """An alias for :attr:`.Bundle.columns`."""
1670
1671 def _clone(self, **kw):
1672 cloned = self.__class__.__new__(self.__class__)
1673 cloned.__dict__.update(self.__dict__)
1674 return cloned
1675
1676 def __clause_element__(self):
1677 # ensure existing entity_namespace remains
1678 annotations = {"bundle": self, "entity_namespace": self}
1679 annotations.update(self._annotations)
1680
1681 plugin_subject = self.exprs[0]._propagate_attrs.get(
1682 "plugin_subject", self.entity
1683 )
1684 return (
1685 expression.ClauseList(
1686 _literal_as_text_role=roles.ColumnsClauseRole,
1687 group=False,
1688 *[e._annotations.get("bundle", e) for e in self.exprs],
1689 )
1690 ._annotate(annotations)
1691 ._set_propagate_attrs(
1692 # the Bundle *must* use the orm plugin no matter what. the
1693 # subject can be None but it's much better if it's not.
1694 {
1695 "compile_state_plugin": "orm",
1696 "plugin_subject": plugin_subject,
1697 }
1698 )
1699 )
1700
1701 @property
1702 def clauses(self):
1703 return self.__clause_element__().clauses
1704
1705 def label(self, name):
1706 """Provide a copy of this :class:`.Bundle` passing a new label."""
1707
1708 cloned = self._clone()
1709 cloned.name = name
1710 return cloned
1711
1712 def create_row_processor(
1713 self,
1714 query: Select[Unpack[TupleAny]],
1715 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1716 labels: Sequence[str],
1717 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1718 """Produce the "row processing" function for this :class:`.Bundle`.
1719
1720 May be overridden by subclasses to provide custom behaviors when
1721 results are fetched. The method is passed the statement object and a
1722 set of "row processor" functions at query execution time; these
1723 processor functions when given a result row will return the individual
1724 attribute value, which can then be adapted into any kind of return data
1725 structure.
1726
1727 The example below illustrates replacing the usual :class:`.Row`
1728 return structure with a straight Python dictionary::
1729
1730 from sqlalchemy.orm import Bundle
1731
1732
1733 class DictBundle(Bundle):
1734 def create_row_processor(self, query, procs, labels):
1735 "Override create_row_processor to return values as dictionaries"
1736
1737 def proc(row):
1738 return dict(zip(labels, (proc(row) for proc in procs)))
1739
1740 return proc
1741
1742 A result from the above :class:`_orm.Bundle` will return dictionary
1743 values::
1744
1745 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1746 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1747 print(row.mybundle["data1"], row.mybundle["data2"])
1748
1749 """ # noqa: E501
1750 keyed_tuple = result_tuple(labels, [() for l in labels])
1751
1752 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1753 return keyed_tuple([proc(row) for proc in procs])
1754
1755 return proc
1756
1757
1758def _orm_full_deannotate(element: _SA) -> _SA:
1759 return sql_util._deep_deannotate(element)
1760
1761
1762class _ORMJoin(expression.Join):
1763 """Extend Join to support ORM constructs as input."""
1764
1765 __visit_name__ = expression.Join.__visit_name__
1766
1767 inherit_cache = True
1768
1769 def __init__(
1770 self,
1771 left: _FromClauseArgument,
1772 right: _FromClauseArgument,
1773 onclause: Optional[_OnClauseArgument] = None,
1774 isouter: bool = False,
1775 full: bool = False,
1776 _left_memo: Optional[Any] = None,
1777 _right_memo: Optional[Any] = None,
1778 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1779 ):
1780 left_info = cast(
1781 "Union[FromClause, _InternalEntityType[Any]]",
1782 inspection.inspect(left),
1783 )
1784
1785 right_info = cast(
1786 "Union[FromClause, _InternalEntityType[Any]]",
1787 inspection.inspect(right),
1788 )
1789 adapt_to = right_info.selectable
1790
1791 # used by joined eager loader
1792 self._left_memo = _left_memo
1793 self._right_memo = _right_memo
1794
1795 if isinstance(onclause, attributes.QueryableAttribute):
1796 if TYPE_CHECKING:
1797 assert isinstance(
1798 onclause.comparator, RelationshipProperty.Comparator
1799 )
1800 on_selectable = onclause.comparator._source_selectable()
1801 prop = onclause.property
1802 _extra_criteria += onclause._extra_criteria
1803 elif isinstance(onclause, MapperProperty):
1804 # used internally by joined eager loader...possibly not ideal
1805 prop = onclause
1806 on_selectable = prop.parent.selectable
1807 else:
1808 prop = None
1809 on_selectable = None
1810
1811 left_selectable = left_info.selectable
1812 if prop:
1813 adapt_from: Optional[FromClause]
1814 if sql_util.clause_is_present(on_selectable, left_selectable):
1815 adapt_from = on_selectable
1816 else:
1817 assert isinstance(left_selectable, FromClause)
1818 adapt_from = left_selectable
1819
1820 (
1821 pj,
1822 sj,
1823 source,
1824 dest,
1825 secondary,
1826 target_adapter,
1827 ) = prop._create_joins(
1828 source_selectable=adapt_from,
1829 dest_selectable=adapt_to,
1830 source_polymorphic=True,
1831 of_type_entity=right_info,
1832 alias_secondary=True,
1833 extra_criteria=_extra_criteria,
1834 )
1835
1836 if sj is not None:
1837 if isouter:
1838 # note this is an inner join from secondary->right
1839 right = sql.join(secondary, right, sj)
1840 onclause = pj
1841 else:
1842 left = sql.join(left, secondary, pj, isouter)
1843 onclause = sj
1844 else:
1845 onclause = pj
1846
1847 self._target_adapter = target_adapter
1848
1849 # we don't use the normal coercions logic for _ORMJoin
1850 # (probably should), so do some gymnastics to get the entity.
1851 # logic here is for #8721, which was a major bug in 1.4
1852 # for almost two years, not reported/fixed until 1.4.43 (!)
1853 if is_selectable(left_info):
1854 parententity = left_selectable._annotations.get(
1855 "parententity", None
1856 )
1857 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1858 parententity = left_info
1859 else:
1860 parententity = None
1861
1862 if parententity is not None:
1863 self._annotations = self._annotations.union(
1864 {"parententity": parententity}
1865 )
1866
1867 augment_onclause = bool(_extra_criteria) and not prop
1868 expression.Join.__init__(self, left, right, onclause, isouter, full)
1869
1870 assert self.onclause is not None
1871
1872 if augment_onclause:
1873 self.onclause &= sql.and_(*_extra_criteria)
1874
1875 if (
1876 not prop
1877 and getattr(right_info, "mapper", None)
1878 and right_info.mapper.single # type: ignore
1879 ):
1880 right_info = cast("_InternalEntityType[Any]", right_info)
1881 # if single inheritance target and we are using a manual
1882 # or implicit ON clause, augment it the same way we'd augment the
1883 # WHERE.
1884 single_crit = right_info.mapper._single_table_criterion
1885 if single_crit is not None:
1886 if insp_is_aliased_class(right_info):
1887 single_crit = right_info._adapter.traverse(single_crit)
1888 self.onclause = self.onclause & single_crit
1889
1890 def _splice_into_center(self, other):
1891 """Splice a join into the center.
1892
1893 Given join(a, b) and join(b, c), return join(a, b).join(c)
1894
1895 """
1896 leftmost = other
1897 while isinstance(leftmost, sql.Join):
1898 leftmost = leftmost.left
1899
1900 assert self.right is leftmost
1901
1902 left = _ORMJoin(
1903 self.left,
1904 other.left,
1905 self.onclause,
1906 isouter=self.isouter,
1907 _left_memo=self._left_memo,
1908 _right_memo=other._left_memo._path_registry,
1909 )
1910
1911 return _ORMJoin(
1912 left,
1913 other.right,
1914 other.onclause,
1915 isouter=other.isouter,
1916 _right_memo=other._right_memo,
1917 )
1918
1919 def join(
1920 self,
1921 right: _FromClauseArgument,
1922 onclause: Optional[_OnClauseArgument] = None,
1923 isouter: bool = False,
1924 full: bool = False,
1925 ) -> _ORMJoin:
1926 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1927
1928 def outerjoin(
1929 self,
1930 right: _FromClauseArgument,
1931 onclause: Optional[_OnClauseArgument] = None,
1932 full: bool = False,
1933 ) -> _ORMJoin:
1934 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1935
1936
1937def with_parent(
1938 instance: object,
1939 prop: attributes.QueryableAttribute[Any],
1940 from_entity: Optional[_EntityType[Any]] = None,
1941) -> ColumnElement[bool]:
1942 """Create filtering criterion that relates this query's primary entity
1943 to the given related instance, using established
1944 :func:`_orm.relationship()`
1945 configuration.
1946
1947 E.g.::
1948
1949 stmt = select(Address).where(with_parent(some_user, User.addresses))
1950
1951 The SQL rendered is the same as that rendered when a lazy loader
1952 would fire off from the given parent on that attribute, meaning
1953 that the appropriate state is taken from the parent object in
1954 Python without the need to render joins to the parent table
1955 in the rendered statement.
1956
1957 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1958 to indicate the left side of the criteria::
1959
1960
1961 a1 = aliased(Address)
1962 a2 = aliased(Address)
1963 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
1964
1965 The above use is equivalent to using the
1966 :func:`_orm.with_parent.from_entity` argument::
1967
1968 a1 = aliased(Address)
1969 a2 = aliased(Address)
1970 stmt = select(a1, a2).where(
1971 with_parent(u1, User.addresses, from_entity=a2)
1972 )
1973
1974 :param instance:
1975 An instance which has some :func:`_orm.relationship`.
1976
1977 :param property:
1978 Class-bound attribute, which indicates
1979 what relationship from the instance should be used to reconcile the
1980 parent/child relationship.
1981
1982 :param from_entity:
1983 Entity in which to consider as the left side. This defaults to the
1984 "zero" entity of the :class:`_query.Query` itself.
1985
1986 """ # noqa: E501
1987 prop_t: RelationshipProperty[Any]
1988
1989 if isinstance(prop, str):
1990 raise sa_exc.ArgumentError(
1991 "with_parent() accepts class-bound mapped attributes, not strings"
1992 )
1993 elif isinstance(prop, attributes.QueryableAttribute):
1994 if prop._of_type:
1995 from_entity = prop._of_type
1996 mapper_property = prop.property
1997 if mapper_property is None or not prop_is_relationship(
1998 mapper_property
1999 ):
2000 raise sa_exc.ArgumentError(
2001 f"Expected relationship property for with_parent(), "
2002 f"got {mapper_property}"
2003 )
2004 prop_t = mapper_property
2005 else:
2006 prop_t = prop
2007
2008 return prop_t._with_parent(instance, from_entity=from_entity)
2009
2010
2011def has_identity(object_: object) -> bool:
2012 """Return True if the given object has a database
2013 identity.
2014
2015 This typically corresponds to the object being
2016 in either the persistent or detached state.
2017
2018 .. seealso::
2019
2020 :func:`.was_deleted`
2021
2022 """
2023 state = attributes.instance_state(object_)
2024 return state.has_identity
2025
2026
2027def was_deleted(object_: object) -> bool:
2028 """Return True if the given object was deleted
2029 within a session flush.
2030
2031 This is regardless of whether or not the object is
2032 persistent or detached.
2033
2034 .. seealso::
2035
2036 :attr:`.InstanceState.was_deleted`
2037
2038 """
2039
2040 state = attributes.instance_state(object_)
2041 return state.was_deleted
2042
2043
2044def _entity_corresponds_to(
2045 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2046) -> bool:
2047 """determine if 'given' corresponds to 'entity', in terms
2048 of an entity passed to Query that would match the same entity
2049 being referred to elsewhere in the query.
2050
2051 """
2052 if insp_is_aliased_class(entity):
2053 if insp_is_aliased_class(given):
2054 if entity._base_alias() is given._base_alias():
2055 return True
2056 return False
2057 elif insp_is_aliased_class(given):
2058 if given._use_mapper_path:
2059 return entity in given.with_polymorphic_mappers
2060 else:
2061 return entity is given
2062
2063 assert insp_is_mapper(given)
2064 return entity.common_parent(given)
2065
2066
2067def _entity_corresponds_to_use_path_impl(
2068 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2069) -> bool:
2070 """determine if 'given' corresponds to 'entity', in terms
2071 of a path of loader options where a mapped attribute is taken to
2072 be a member of a parent entity.
2073
2074 e.g.::
2075
2076 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2077 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2078
2079 a1 = aliased(A)
2080 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2081 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2082
2083 wp = with_polymorphic(A, [A1, A2])
2084 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2085 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2086
2087 """
2088 if insp_is_aliased_class(given):
2089 return (
2090 insp_is_aliased_class(entity)
2091 and not entity._use_mapper_path
2092 and (given is entity or entity in given._with_polymorphic_entities)
2093 )
2094 elif not insp_is_aliased_class(entity):
2095 return given.isa(entity.mapper)
2096 else:
2097 return (
2098 entity._use_mapper_path
2099 and given in entity.with_polymorphic_mappers
2100 )
2101
2102
2103def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2104 """determine if 'given' "is a" mapper, in terms of the given
2105 would load rows of type 'mapper'.
2106
2107 """
2108 if given.is_aliased_class:
2109 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2110 mapper
2111 )
2112 elif given.with_polymorphic_mappers:
2113 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2114 else:
2115 return given.isa(mapper)
2116
2117
2118def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2119 """calculate __getitem__ in terms of an iterable query object
2120 that also has a slice() method.
2121
2122 """
2123
2124 def _no_negative_indexes():
2125 raise IndexError(
2126 "negative indexes are not accepted by SQL "
2127 "index / slice operators"
2128 )
2129
2130 if isinstance(item, slice):
2131 start, stop, step = util.decode_slice(item)
2132
2133 if (
2134 isinstance(stop, int)
2135 and isinstance(start, int)
2136 and stop - start <= 0
2137 ):
2138 return []
2139
2140 elif (isinstance(start, int) and start < 0) or (
2141 isinstance(stop, int) and stop < 0
2142 ):
2143 _no_negative_indexes()
2144
2145 res = iterable_query.slice(start, stop)
2146 if step is not None:
2147 return list(res)[None : None : item.step]
2148 else:
2149 return list(res)
2150 else:
2151 if item == -1:
2152 _no_negative_indexes()
2153 else:
2154 return list(iterable_query[item : item + 1])[0]
2155
2156
2157def _is_mapped_annotation(
2158 raw_annotation: _AnnotationScanType,
2159 cls: Type[Any],
2160 originating_cls: Type[Any],
2161) -> bool:
2162 try:
2163 annotated = de_stringify_annotation(
2164 cls, raw_annotation, originating_cls.__module__
2165 )
2166 except NameError:
2167 # in most cases, at least within our own tests, we can raise
2168 # here, which is more accurate as it prevents us from returning
2169 # false negatives. However, in the real world, try to avoid getting
2170 # involved with end-user annotations that have nothing to do with us.
2171 # see issue #8888 where we bypass using this function in the case
2172 # that we want to detect an unresolvable Mapped[] type.
2173 return False
2174 else:
2175 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2176
2177
2178class _CleanupError(Exception):
2179 pass
2180
2181
2182def _cleanup_mapped_str_annotation(
2183 annotation: str, originating_module: str
2184) -> str:
2185 # fix up an annotation that comes in as the form:
2186 # 'Mapped[List[Address]]' so that it instead looks like:
2187 # 'Mapped[List["Address"]]' , which will allow us to get
2188 # "Address" as a string
2189
2190 # additionally, resolve symbols for these names since this is where
2191 # we'd have to do it
2192
2193 inner: Optional[Match[str]]
2194
2195 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2196
2197 if not mm:
2198 return annotation
2199
2200 # ticket #8759. Resolve the Mapped name to a real symbol.
2201 # originally this just checked the name.
2202 try:
2203 obj = eval_name_only(mm.group(1), originating_module)
2204 except NameError as ne:
2205 raise _CleanupError(
2206 f'For annotation "{annotation}", could not resolve '
2207 f'container type "{mm.group(1)}". '
2208 "Please ensure this type is imported at the module level "
2209 "outside of TYPE_CHECKING blocks"
2210 ) from ne
2211
2212 if obj is typing.ClassVar:
2213 real_symbol = "ClassVar"
2214 else:
2215 try:
2216 if issubclass(obj, _MappedAnnotationBase):
2217 real_symbol = obj.__name__
2218 else:
2219 return annotation
2220 except TypeError:
2221 # avoid isinstance(obj, type) check, just catch TypeError
2222 return annotation
2223
2224 # note: if one of the codepaths above didn't define real_symbol and
2225 # then didn't return, real_symbol raises UnboundLocalError
2226 # which is actually a NameError, and the calling routines don't
2227 # notice this since they are catching NameError anyway. Just in case
2228 # this is being modified in the future, something to be aware of.
2229
2230 stack = []
2231 inner = mm
2232 while True:
2233 stack.append(real_symbol if mm is inner else inner.group(1))
2234 g2 = inner.group(2)
2235 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2236 if inner is None:
2237 stack.append(g2)
2238 break
2239
2240 # stacks we want to rewrite, that is, quote the last entry which
2241 # we think is a relationship class name:
2242 #
2243 # ['Mapped', 'List', 'Address']
2244 # ['Mapped', 'A']
2245 #
2246 # stacks we dont want to rewrite, which are generally MappedColumn
2247 # use cases:
2248 #
2249 # ['Mapped', "'Optional[Dict[str, str]]'"]
2250 # ['Mapped', 'dict[str, str] | None']
2251
2252 if (
2253 # avoid already quoted symbols such as
2254 # ['Mapped', "'Optional[Dict[str, str]]'"]
2255 not re.match(r"""^["'].*["']$""", stack[-1])
2256 # avoid further generics like Dict[] such as
2257 # ['Mapped', 'dict[str, str] | None'],
2258 # ['Mapped', 'list[int] | list[str]'],
2259 # ['Mapped', 'Union[list[int], list[str]]'],
2260 and not re.search(r"[\[\]]", stack[-1])
2261 ):
2262 stripchars = "\"' "
2263 stack[-1] = ", ".join(
2264 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2265 )
2266
2267 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2268
2269 return annotation
2270
2271
2272def _extract_mapped_subtype(
2273 raw_annotation: Optional[_AnnotationScanType],
2274 cls: type,
2275 originating_module: str,
2276 key: str,
2277 attr_cls: Type[Any],
2278 required: bool,
2279 is_dataclass_field: bool,
2280 expect_mapped: bool = True,
2281 raiseerr: bool = True,
2282) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2283 """given an annotation, figure out if it's ``Mapped[something]`` and if
2284 so, return the ``something`` part.
2285
2286 Includes error raise scenarios and other options.
2287
2288 """
2289
2290 if raw_annotation is None:
2291 if required:
2292 raise orm_exc.MappedAnnotationError(
2293 f"Python typing annotation is required for attribute "
2294 f'"{cls.__name__}.{key}" when primary argument(s) for '
2295 f'"{attr_cls.__name__}" construct are None or not present'
2296 )
2297 return None
2298
2299 try:
2300 # destringify the "outside" of the annotation. note we are not
2301 # adding include_generic so it will *not* dig into generic contents,
2302 # which will remain as ForwardRef or plain str under future annotations
2303 # mode. The full destringify happens later when mapped_column goes
2304 # to do a full lookup in the registry type_annotations_map.
2305 annotated = de_stringify_annotation(
2306 cls,
2307 raw_annotation,
2308 originating_module,
2309 str_cleanup_fn=_cleanup_mapped_str_annotation,
2310 )
2311 except _CleanupError as ce:
2312 raise orm_exc.MappedAnnotationError(
2313 f"Could not interpret annotation {raw_annotation}. "
2314 "Check that it uses names that are correctly imported at the "
2315 "module level. See chained stack trace for more hints."
2316 ) from ce
2317 except NameError as ne:
2318 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2319 raise orm_exc.MappedAnnotationError(
2320 f"Could not interpret annotation {raw_annotation}. "
2321 "Check that it uses names that are correctly imported at the "
2322 "module level. See chained stack trace for more hints."
2323 ) from ne
2324
2325 annotated = raw_annotation # type: ignore
2326
2327 if is_dataclass_field:
2328 return annotated, None
2329 else:
2330 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2331 annotated, _MappedAnnotationBase
2332 ):
2333 if expect_mapped:
2334 if not raiseerr:
2335 return None
2336
2337 origin = getattr(annotated, "__origin__", None)
2338 if origin is typing.ClassVar:
2339 return None
2340
2341 # check for other kind of ORM descriptor like AssociationProxy,
2342 # don't raise for that (issue #9957)
2343 elif isinstance(origin, type) and issubclass(
2344 origin, ORMDescriptor
2345 ):
2346 return None
2347
2348 raise orm_exc.MappedAnnotationError(
2349 f'Type annotation for "{cls.__name__}.{key}" '
2350 "can't be correctly interpreted for "
2351 "Annotated Declarative Table form. ORM annotations "
2352 "should normally make use of the ``Mapped[]`` generic "
2353 "type, or other ORM-compatible generic type, as a "
2354 "container for the actual type, which indicates the "
2355 "intent that the attribute is mapped. "
2356 "Class variables that are not intended to be mapped "
2357 "by the ORM should use ClassVar[]. "
2358 "To allow Annotated Declarative to disregard legacy "
2359 "annotations which don't use Mapped[] to pass, set "
2360 '"__allow_unmapped__ = True" on the class or a '
2361 "superclass this class.",
2362 code="zlpr",
2363 )
2364
2365 else:
2366 return annotated, None
2367
2368 if len(annotated.__args__) != 1:
2369 raise orm_exc.MappedAnnotationError(
2370 "Expected sub-type for Mapped[] annotation"
2371 )
2372
2373 return (
2374 # fix dict/list/set args to be ForwardRef, see #11814
2375 fixup_container_fwd_refs(annotated.__args__[0]),
2376 annotated.__origin__,
2377 )
2378
2379
2380def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2381 if hasattr(prop, "_mapper_property_name"):
2382 name = prop._mapper_property_name()
2383 else:
2384 name = None
2385 return util.clsname_as_plain_name(prop, name)