1# orm/util.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Protocol
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35import weakref
36
37from . import attributes # noqa
38from . import exc
39from . import exc as orm_exc
40from ._typing import _O
41from ._typing import insp_is_aliased_class
42from ._typing import insp_is_mapper
43from ._typing import prop_is_relationship
44from .base import _class_to_mapper as _class_to_mapper
45from .base import _MappedAnnotationBase
46from .base import _never_set as _never_set # noqa: F401
47from .base import _none_only_set as _none_only_set # noqa: F401
48from .base import _none_set as _none_set # noqa: F401
49from .base import attribute_str as attribute_str # noqa: F401
50from .base import class_mapper as class_mapper
51from .base import DynamicMapped
52from .base import InspectionAttr as InspectionAttr
53from .base import instance_str as instance_str # noqa: F401
54from .base import Mapped
55from .base import object_mapper as object_mapper
56from .base import object_state as object_state # noqa: F401
57from .base import opt_manager_of_class
58from .base import ORMDescriptor
59from .base import state_attribute_str as state_attribute_str # noqa: F401
60from .base import state_class_str as state_class_str # noqa: F401
61from .base import state_str as state_str # noqa: F401
62from .base import WriteOnlyMapped
63from .interfaces import CriteriaOption
64from .interfaces import MapperProperty as MapperProperty
65from .interfaces import ORMColumnsClauseRole
66from .interfaces import ORMEntityColumnsClauseRole
67from .interfaces import ORMFromClauseRole
68from .path_registry import PathRegistry as PathRegistry
69from .. import event
70from .. import exc as sa_exc
71from .. import inspection
72from .. import sql
73from .. import util
74from ..engine.result import result_tuple
75from ..sql import coercions
76from ..sql import expression
77from ..sql import lambdas
78from ..sql import roles
79from ..sql import util as sql_util
80from ..sql import visitors
81from ..sql._typing import is_selectable
82from ..sql.annotation import SupportsCloneAnnotations
83from ..sql.base import ColumnCollection
84from ..sql.cache_key import HasCacheKey
85from ..sql.cache_key import MemoizedHasCacheKey
86from ..sql.elements import ColumnElement
87from ..sql.elements import KeyedColumnElement
88from ..sql.selectable import FromClause
89from ..util.langhelpers import MemoizedSlots
90from ..util.typing import de_stringify_annotation as _de_stringify_annotation
91from ..util.typing import eval_name_only as _eval_name_only
92from ..util.typing import fixup_container_fwd_refs
93from ..util.typing import get_origin
94from ..util.typing import is_origin_of_cls
95from ..util.typing import Literal
96from ..util.typing import TupleAny
97from ..util.typing import Unpack
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InternalEntityType
103 from ._typing import _ORMCOLEXPR
104 from .context import _MapperEntity
105 from .context import _ORMCompileState
106 from .mapper import Mapper
107 from .path_registry import _AbstractEntityRegistry
108 from .query import Query
109 from .relationships import RelationshipProperty
110 from ..engine import Row
111 from ..engine import RowMapping
112 from ..sql._typing import _CE
113 from ..sql._typing import _ColumnExpressionArgument
114 from ..sql._typing import _EquivalentColumnMap
115 from ..sql._typing import _FromClauseArgument
116 from ..sql._typing import _OnClauseArgument
117 from ..sql._typing import _PropagateAttrsType
118 from ..sql.annotation import _SA
119 from ..sql.base import ReadOnlyColumnCollection
120 from ..sql.elements import BindParameter
121 from ..sql.selectable import _ColumnsClauseElement
122 from ..sql.selectable import Select
123 from ..sql.selectable import Selectable
124 from ..sql.visitors import anon_map
125 from ..util.typing import _AnnotationScanType
126
127_T = TypeVar("_T", bound=Any)
128
129all_cascades = frozenset(
130 (
131 "delete",
132 "delete-orphan",
133 "all",
134 "merge",
135 "expunge",
136 "save-update",
137 "refresh-expire",
138 "none",
139 )
140)
141
142_de_stringify_partial = functools.partial(
143 functools.partial,
144 locals_=util.immutabledict(
145 {
146 "Mapped": Mapped,
147 "WriteOnlyMapped": WriteOnlyMapped,
148 "DynamicMapped": DynamicMapped,
149 }
150 ),
151)
152
153# partial is practically useless as we have to write out the whole
154# function and maintain the signature anyway
155
156
157class _DeStringifyAnnotation(Protocol):
158 def __call__(
159 self,
160 cls: Type[Any],
161 annotation: _AnnotationScanType,
162 originating_module: str,
163 *,
164 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
165 include_generic: bool = False,
166 ) -> Type[Any]: ...
167
168
169de_stringify_annotation = cast(
170 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
171)
172
173
174class _EvalNameOnly(Protocol):
175 def __call__(self, name: str, module_name: str) -> Any: ...
176
177
178eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
179
180
181class CascadeOptions(FrozenSet[str]):
182 """Keeps track of the options sent to
183 :paramref:`.relationship.cascade`"""
184
185 _add_w_all_cascades = all_cascades.difference(
186 ["all", "none", "delete-orphan"]
187 )
188 _allowed_cascades = all_cascades
189
190 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
191
192 __slots__ = (
193 "save_update",
194 "delete",
195 "refresh_expire",
196 "merge",
197 "expunge",
198 "delete_orphan",
199 )
200
201 save_update: bool
202 delete: bool
203 refresh_expire: bool
204 merge: bool
205 expunge: bool
206 delete_orphan: bool
207
208 def __new__(
209 cls, value_list: Optional[Union[Iterable[str], str]]
210 ) -> CascadeOptions:
211 if isinstance(value_list, str) or value_list is None:
212 return cls.from_string(value_list) # type: ignore
213 values = set(value_list)
214 if values.difference(cls._allowed_cascades):
215 raise sa_exc.ArgumentError(
216 "Invalid cascade option(s): %s"
217 % ", ".join(
218 [
219 repr(x)
220 for x in sorted(
221 values.difference(cls._allowed_cascades)
222 )
223 ]
224 )
225 )
226
227 if "all" in values:
228 values.update(cls._add_w_all_cascades)
229 if "none" in values:
230 values.clear()
231 values.discard("all")
232
233 self = super().__new__(cls, values)
234 self.save_update = "save-update" in values
235 self.delete = "delete" in values
236 self.refresh_expire = "refresh-expire" in values
237 self.merge = "merge" in values
238 self.expunge = "expunge" in values
239 self.delete_orphan = "delete-orphan" in values
240
241 if self.delete_orphan and not self.delete:
242 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
243 return self
244
245 def __repr__(self):
246 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
247
248 @classmethod
249 def from_string(cls, arg):
250 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
251 return cls(values)
252
253
254def _validator_events(desc, key, validator, include_removes, include_backrefs):
255 """Runs a validation method on an attribute value to be set or
256 appended.
257 """
258
259 if not include_backrefs:
260
261 def detect_is_backref(state, initiator):
262 impl = state.manager[key].impl
263 return initiator.impl is not impl
264
265 if include_removes:
266
267 def append(state, value, initiator):
268 if initiator.op is not attributes.OP_BULK_REPLACE and (
269 include_backrefs or not detect_is_backref(state, initiator)
270 ):
271 return validator(state.obj(), key, value, False)
272 else:
273 return value
274
275 def bulk_set(state, values, initiator):
276 if include_backrefs or not detect_is_backref(state, initiator):
277 obj = state.obj()
278 values[:] = [
279 validator(obj, key, value, False) for value in values
280 ]
281
282 def set_(state, value, oldvalue, initiator):
283 if include_backrefs or not detect_is_backref(state, initiator):
284 return validator(state.obj(), key, value, False)
285 else:
286 return value
287
288 def remove(state, value, initiator):
289 if include_backrefs or not detect_is_backref(state, initiator):
290 validator(state.obj(), key, value, True)
291
292 else:
293
294 def append(state, value, initiator):
295 if initiator.op is not attributes.OP_BULK_REPLACE and (
296 include_backrefs or not detect_is_backref(state, initiator)
297 ):
298 return validator(state.obj(), key, value)
299 else:
300 return value
301
302 def bulk_set(state, values, initiator):
303 if include_backrefs or not detect_is_backref(state, initiator):
304 obj = state.obj()
305 values[:] = [validator(obj, key, value) for value in values]
306
307 def set_(state, value, oldvalue, initiator):
308 if include_backrefs or not detect_is_backref(state, initiator):
309 return validator(state.obj(), key, value)
310 else:
311 return value
312
313 event.listen(desc, "append", append, raw=True, retval=True)
314 event.listen(desc, "bulk_replace", bulk_set, raw=True)
315 event.listen(desc, "set", set_, raw=True, retval=True)
316 if include_removes:
317 event.listen(desc, "remove", remove, raw=True, retval=True)
318
319
320def polymorphic_union(
321 table_map, typecolname, aliasname="p_union", cast_nulls=True
322):
323 """Create a ``UNION`` statement used by a polymorphic mapper.
324
325 See :ref:`concrete_inheritance` for an example of how
326 this is used.
327
328 :param table_map: mapping of polymorphic identities to
329 :class:`_schema.Table` objects.
330 :param typecolname: string name of a "discriminator" column, which will be
331 derived from the query, producing the polymorphic identity for
332 each row. If ``None``, no polymorphic discriminator is generated.
333 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
334 construct generated.
335 :param cast_nulls: if True, non-existent columns, which are represented
336 as labeled NULLs, will be passed into CAST. This is a legacy behavior
337 that is problematic on some backends such as Oracle - in which case it
338 can be set to False.
339
340 """
341
342 colnames: util.OrderedSet[str] = util.OrderedSet()
343 colnamemaps = {}
344 types = {}
345 for key in table_map:
346 table = table_map[key]
347
348 table = coercions.expect(roles.FromClauseRole, table)
349 table_map[key] = table
350
351 m = {}
352 for c in table.c:
353 if c.key == typecolname:
354 raise sa_exc.InvalidRequestError(
355 "Polymorphic union can't use '%s' as the discriminator "
356 "column due to mapped column %r; please apply the "
357 "'typecolname' "
358 "argument; this is available on "
359 "ConcreteBase as '_concrete_discriminator_name'"
360 % (typecolname, c)
361 )
362 colnames.add(c.key)
363 m[c.key] = c
364 types[c.key] = c.type
365 colnamemaps[table] = m
366
367 def col(name, table):
368 try:
369 return colnamemaps[table][name]
370 except KeyError:
371 if cast_nulls:
372 return sql.cast(sql.null(), types[name]).label(name)
373 else:
374 return sql.type_coerce(sql.null(), types[name]).label(name)
375
376 result = []
377 for type_, table in table_map.items():
378 if typecolname is not None:
379 result.append(
380 sql.select(
381 *(
382 [col(name, table) for name in colnames]
383 + [
384 sql.literal_column(
385 sql_util._quote_ddl_expr(type_)
386 ).label(typecolname)
387 ]
388 )
389 ).select_from(table)
390 )
391 else:
392 result.append(
393 sql.select(
394 *[col(name, table) for name in colnames]
395 ).select_from(table)
396 )
397 return sql.union_all(*result).alias(aliasname)
398
399
400def identity_key(
401 class_: Optional[Type[_T]] = None,
402 ident: Union[Any, Tuple[Any, ...]] = None,
403 *,
404 instance: Optional[_T] = None,
405 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
406 identity_token: Optional[Any] = None,
407) -> _IdentityKeyType[_T]:
408 r"""Generate "identity key" tuples, as are used as keys in the
409 :attr:`.Session.identity_map` dictionary.
410
411 This function has several call styles:
412
413 * ``identity_key(class, ident, identity_token=token)``
414
415 This form receives a mapped class and a primary key scalar or
416 tuple as an argument.
417
418 E.g.::
419
420 >>> identity_key(MyClass, (1, 2))
421 (<class '__main__.MyClass'>, (1, 2), None)
422
423 :param class: mapped class (must be a positional argument)
424 :param ident: primary key, may be a scalar or tuple argument.
425 :param identity_token: optional identity token
426
427 * ``identity_key(instance=instance)``
428
429 This form will produce the identity key for a given instance. The
430 instance need not be persistent, only that its primary key attributes
431 are populated (else the key will contain ``None`` for those missing
432 values).
433
434 E.g.::
435
436 >>> instance = MyClass(1, 2)
437 >>> identity_key(instance=instance)
438 (<class '__main__.MyClass'>, (1, 2), None)
439
440 In this form, the given instance is ultimately run though
441 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
442 effect of performing a database check for the corresponding row
443 if the object is expired.
444
445 :param instance: object instance (must be given as a keyword arg)
446
447 * ``identity_key(class, row=row, identity_token=token)``
448
449 This form is similar to the class/tuple form, except is passed a
450 database result row as a :class:`.Row` or :class:`.RowMapping` object.
451
452 E.g.::
453
454 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
455 >>> identity_key(MyClass, row=row)
456 (<class '__main__.MyClass'>, (1, 2), None)
457
458 :param class: mapped class (must be a positional argument)
459 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
460 (must be given as a keyword arg)
461 :param identity_token: optional identity token
462
463 """ # noqa: E501
464 if class_ is not None:
465 mapper = class_mapper(class_)
466 if row is None:
467 if ident is None:
468 raise sa_exc.ArgumentError("ident or row is required")
469 return mapper.identity_key_from_primary_key(
470 tuple(util.to_list(ident)), identity_token=identity_token
471 )
472 else:
473 return mapper.identity_key_from_row(
474 row, identity_token=identity_token
475 )
476 elif instance is not None:
477 mapper = object_mapper(instance)
478 return mapper.identity_key_from_instance(instance)
479 else:
480 raise sa_exc.ArgumentError("class or instance is required")
481
482
483class _TraceAdaptRole(enum.Enum):
484 """Enumeration of all the use cases for ORMAdapter.
485
486 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
487 used for in-place adaption of column expressions to be applied to a SELECT,
488 replacing :class:`.Table` and other objects that are mapped to classes with
489 aliases of those tables in the case of joined eager loading, or in the case
490 of polymorphic loading as used with concrete mappings or other custom "with
491 polymorphic" parameters, with whole user-defined subqueries. The
492 enumerations provide an overview of all the use cases used by ORMAdapter, a
493 layer of formality as to the introduction of new ORMAdapter use cases (of
494 which none are anticipated), as well as a means to trace the origins of a
495 particular ORMAdapter within runtime debugging.
496
497 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
498 open-ended statement adaption, including the ``Query.with_polymorphic()``
499 method and the ``Query.select_from_entity()`` methods, favoring
500 user-explicit aliasing schemes using the ``aliased()`` and
501 ``with_polymorphic()`` standalone constructs; these still use adaption,
502 however the adaption is applied in a narrower scope.
503
504 """
505
506 # aliased() use that is used to adapt individual attributes at query
507 # construction time
508 ALIASED_INSP = enum.auto()
509
510 # joinedload cases; typically adapt an ON clause of a relationship
511 # join
512 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
513 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
514 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
515
516 # polymorphic cases - these are complex ones that replace FROM
517 # clauses, replacing tables with subqueries
518 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
519 WITH_POLYMORPHIC_ADAPTER = enum.auto()
520 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
521 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
522
523 # the from_statement() case, used only to adapt individual attributes
524 # from a given statement to local ORM attributes at result fetching
525 # time. assigned to ORMCompileState._from_obj_alias
526 ADAPT_FROM_STATEMENT = enum.auto()
527
528 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
529 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
530 # joinedloads are then placed on the outside.
531 # assigned to ORMCompileState.compound_eager_adapter
532 COMPOUND_EAGER_STATEMENT = enum.auto()
533
534 # the legacy Query._set_select_from() case.
535 # this is needed for Query's set operations (i.e. UNION, etc. )
536 # as well as "legacy from_self()", which while removed from 2.0 as
537 # public API, is used for the Query.count() method. this one
538 # still does full statement traversal
539 # assigned to ORMCompileState._from_obj_alias
540 LEGACY_SELECT_FROM_ALIAS = enum.auto()
541
542
543class ORMStatementAdapter(sql_util.ColumnAdapter):
544 """ColumnAdapter which includes a role attribute."""
545
546 __slots__ = ("role",)
547
548 def __init__(
549 self,
550 role: _TraceAdaptRole,
551 selectable: Selectable,
552 *,
553 equivalents: Optional[_EquivalentColumnMap] = None,
554 adapt_required: bool = False,
555 allow_label_resolve: bool = True,
556 anonymize_labels: bool = False,
557 adapt_on_names: bool = False,
558 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
559 ):
560 self.role = role
561 super().__init__(
562 selectable,
563 equivalents=equivalents,
564 adapt_required=adapt_required,
565 allow_label_resolve=allow_label_resolve,
566 anonymize_labels=anonymize_labels,
567 adapt_on_names=adapt_on_names,
568 adapt_from_selectables=adapt_from_selectables,
569 )
570
571
572class ORMAdapter(sql_util.ColumnAdapter):
573 """ColumnAdapter subclass which excludes adaptation of entities from
574 non-matching mappers.
575
576 """
577
578 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
579
580 is_aliased_class: bool
581 aliased_insp: Optional[AliasedInsp[Any]]
582
583 def __init__(
584 self,
585 role: _TraceAdaptRole,
586 entity: _InternalEntityType[Any],
587 *,
588 equivalents: Optional[_EquivalentColumnMap] = None,
589 adapt_required: bool = False,
590 allow_label_resolve: bool = True,
591 anonymize_labels: bool = False,
592 selectable: Optional[Selectable] = None,
593 limit_on_entity: bool = True,
594 adapt_on_names: bool = False,
595 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
596 ):
597 self.role = role
598 self.mapper = entity.mapper
599 if selectable is None:
600 selectable = entity.selectable
601 if insp_is_aliased_class(entity):
602 self.is_aliased_class = True
603 self.aliased_insp = entity
604 else:
605 self.is_aliased_class = False
606 self.aliased_insp = None
607
608 super().__init__(
609 selectable,
610 equivalents,
611 adapt_required=adapt_required,
612 allow_label_resolve=allow_label_resolve,
613 anonymize_labels=anonymize_labels,
614 include_fn=self._include_fn if limit_on_entity else None,
615 adapt_on_names=adapt_on_names,
616 adapt_from_selectables=adapt_from_selectables,
617 )
618
619 def _include_fn(self, elem):
620 entity = elem._annotations.get("parentmapper", None)
621
622 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
623
624
625class AliasedClass(
626 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
627):
628 r"""Represents an "aliased" form of a mapped class for usage with Query.
629
630 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
631 construct, this object mimics the mapped class using a
632 ``__getattr__`` scheme and maintains a reference to a
633 real :class:`~sqlalchemy.sql.expression.Alias` object.
634
635 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
636 within a SQL statement generated by the ORM, such that an existing
637 mapped entity can be used in multiple contexts. A simple example::
638
639 # find all pairs of users with the same name
640 user_alias = aliased(User)
641 session.query(User, user_alias).join(
642 (user_alias, User.id > user_alias.id)
643 ).filter(User.name == user_alias.name)
644
645 :class:`.AliasedClass` is also capable of mapping an existing mapped
646 class to an entirely new selectable, provided this selectable is column-
647 compatible with the existing mapped selectable, and it can also be
648 configured in a mapping as the target of a :func:`_orm.relationship`.
649 See the links below for examples.
650
651 The :class:`.AliasedClass` object is constructed typically using the
652 :func:`_orm.aliased` function. It also is produced with additional
653 configuration when using the :func:`_orm.with_polymorphic` function.
654
655 The resulting object is an instance of :class:`.AliasedClass`.
656 This object implements an attribute scheme which produces the
657 same attribute and method interface as the original mapped
658 class, allowing :class:`.AliasedClass` to be compatible
659 with any attribute technique which works on the original class,
660 including hybrid attributes (see :ref:`hybrids_toplevel`).
661
662 The :class:`.AliasedClass` can be inspected for its underlying
663 :class:`_orm.Mapper`, aliased selectable, and other information
664 using :func:`_sa.inspect`::
665
666 from sqlalchemy import inspect
667
668 my_alias = aliased(MyClass)
669 insp = inspect(my_alias)
670
671 The resulting inspection object is an instance of :class:`.AliasedInsp`.
672
673
674 .. seealso::
675
676 :func:`.aliased`
677
678 :func:`.with_polymorphic`
679
680 :ref:`relationship_aliased_class`
681
682 :ref:`relationship_to_window_function`
683
684
685 """
686
687 __name__: str
688
689 def __init__(
690 self,
691 mapped_class_or_ac: _EntityType[_O],
692 alias: Optional[FromClause] = None,
693 name: Optional[str] = None,
694 flat: bool = False,
695 adapt_on_names: bool = False,
696 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
697 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
698 base_alias: Optional[AliasedInsp[Any]] = None,
699 use_mapper_path: bool = False,
700 represents_outer_join: bool = False,
701 ):
702 insp = cast(
703 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
704 )
705 mapper = insp.mapper
706
707 nest_adapters = False
708
709 if alias is None:
710 if insp.is_aliased_class and insp.selectable._is_subquery:
711 alias = insp.selectable.alias()
712 else:
713 alias = (
714 mapper._with_polymorphic_selectable._anonymous_fromclause(
715 name=name,
716 flat=flat,
717 )
718 )
719 elif insp.is_aliased_class:
720 nest_adapters = True
721
722 assert alias is not None
723 self._aliased_insp = AliasedInsp(
724 self,
725 insp,
726 alias,
727 name,
728 (
729 with_polymorphic_mappers
730 if with_polymorphic_mappers
731 else mapper.with_polymorphic_mappers
732 ),
733 (
734 with_polymorphic_discriminator
735 if with_polymorphic_discriminator is not None
736 else mapper.polymorphic_on
737 ),
738 base_alias,
739 use_mapper_path,
740 adapt_on_names,
741 represents_outer_join,
742 nest_adapters,
743 )
744
745 self.__name__ = f"aliased({mapper.class_.__name__})"
746
747 @classmethod
748 def _reconstitute_from_aliased_insp(
749 cls, aliased_insp: AliasedInsp[_O]
750 ) -> AliasedClass[_O]:
751 obj = cls.__new__(cls)
752 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
753 obj._aliased_insp = aliased_insp
754
755 if aliased_insp._is_with_polymorphic:
756 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
757 if sub_aliased_insp is not aliased_insp:
758 ent = AliasedClass._reconstitute_from_aliased_insp(
759 sub_aliased_insp
760 )
761 setattr(obj, sub_aliased_insp.class_.__name__, ent)
762
763 return obj
764
765 def __getattr__(self, key: str) -> Any:
766 try:
767 _aliased_insp = self.__dict__["_aliased_insp"]
768 except KeyError:
769 raise AttributeError()
770 else:
771 target = _aliased_insp._target
772 # maintain all getattr mechanics
773 attr = getattr(target, key)
774
775 # attribute is a method, that will be invoked against a
776 # "self"; so just return a new method with the same function and
777 # new self
778 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
779 return types.MethodType(attr.__func__, self)
780
781 # attribute is a descriptor, that will be invoked against a
782 # "self"; so invoke the descriptor against this self
783 if hasattr(attr, "__get__"):
784 attr = attr.__get__(None, self)
785
786 # attributes within the QueryableAttribute system will want this
787 # to be invoked so the object can be adapted
788 if hasattr(attr, "adapt_to_entity"):
789 attr = attr.adapt_to_entity(_aliased_insp)
790 setattr(self, key, attr)
791
792 return attr
793
794 def _get_from_serialized(
795 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
796 ) -> Any:
797 # this method is only used in terms of the
798 # sqlalchemy.ext.serializer extension
799 attr = getattr(mapped_class, key)
800 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
801 return types.MethodType(attr.__func__, self)
802
803 # attribute is a descriptor, that will be invoked against a
804 # "self"; so invoke the descriptor against this self
805 if hasattr(attr, "__get__"):
806 attr = attr.__get__(None, self)
807
808 # attributes within the QueryableAttribute system will want this
809 # to be invoked so the object can be adapted
810 if hasattr(attr, "adapt_to_entity"):
811 aliased_insp._weak_entity = weakref.ref(self)
812 attr = attr.adapt_to_entity(aliased_insp)
813 setattr(self, key, attr)
814
815 return attr
816
817 def __repr__(self) -> str:
818 return "<AliasedClass at 0x%x; %s>" % (
819 id(self),
820 self._aliased_insp._target.__name__,
821 )
822
823 def __str__(self) -> str:
824 return str(self._aliased_insp)
825
826
827@inspection._self_inspects
828class AliasedInsp(
829 ORMEntityColumnsClauseRole[_O],
830 ORMFromClauseRole,
831 HasCacheKey,
832 InspectionAttr,
833 MemoizedSlots,
834 inspection.Inspectable["AliasedInsp[_O]"],
835 Generic[_O],
836):
837 """Provide an inspection interface for an
838 :class:`.AliasedClass` object.
839
840 The :class:`.AliasedInsp` object is returned
841 given an :class:`.AliasedClass` using the
842 :func:`_sa.inspect` function::
843
844 from sqlalchemy import inspect
845 from sqlalchemy.orm import aliased
846
847 my_alias = aliased(MyMappedClass)
848 insp = inspect(my_alias)
849
850 Attributes on :class:`.AliasedInsp`
851 include:
852
853 * ``entity`` - the :class:`.AliasedClass` represented.
854 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
855 * ``selectable`` - the :class:`_expression.Alias`
856 construct which ultimately
857 represents an aliased :class:`_schema.Table` or
858 :class:`_expression.Select`
859 construct.
860 * ``name`` - the name of the alias. Also is used as the attribute
861 name when returned in a result tuple from :class:`_query.Query`.
862 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
863 objects
864 indicating all those mappers expressed in the select construct
865 for the :class:`.AliasedClass`.
866 * ``polymorphic_on`` - an alternate column or SQL expression which
867 will be used as the "discriminator" for a polymorphic load.
868
869 .. seealso::
870
871 :ref:`inspection_toplevel`
872
873 """
874
875 __slots__ = (
876 "__weakref__",
877 "_weak_entity",
878 "mapper",
879 "selectable",
880 "name",
881 "_adapt_on_names",
882 "with_polymorphic_mappers",
883 "polymorphic_on",
884 "_use_mapper_path",
885 "_base_alias",
886 "represents_outer_join",
887 "persist_selectable",
888 "local_table",
889 "_is_with_polymorphic",
890 "_with_polymorphic_entities",
891 "_adapter",
892 "_target",
893 "__clause_element__",
894 "_memoized_values",
895 "_all_column_expressions",
896 "_nest_adapters",
897 )
898
899 _cache_key_traversal = [
900 ("name", visitors.ExtendedInternalTraversal.dp_string),
901 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
902 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
903 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
904 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
905 (
906 "with_polymorphic_mappers",
907 visitors.InternalTraversal.dp_has_cache_key_list,
908 ),
909 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
910 ]
911
912 mapper: Mapper[_O]
913 selectable: FromClause
914 _adapter: ORMAdapter
915 with_polymorphic_mappers: Sequence[Mapper[Any]]
916 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
917
918 _weak_entity: weakref.ref[AliasedClass[_O]]
919 """the AliasedClass that refers to this AliasedInsp"""
920
921 _target: Union[Type[_O], AliasedClass[_O]]
922 """the thing referenced by the AliasedClass/AliasedInsp.
923
924 In the vast majority of cases, this is the mapped class. However
925 it may also be another AliasedClass (alias of alias).
926
927 """
928
929 def __init__(
930 self,
931 entity: AliasedClass[_O],
932 inspected: _InternalEntityType[_O],
933 selectable: FromClause,
934 name: Optional[str],
935 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
936 polymorphic_on: Optional[ColumnElement[Any]],
937 _base_alias: Optional[AliasedInsp[Any]],
938 _use_mapper_path: bool,
939 adapt_on_names: bool,
940 represents_outer_join: bool,
941 nest_adapters: bool,
942 ):
943 mapped_class_or_ac = inspected.entity
944 mapper = inspected.mapper
945
946 self._weak_entity = weakref.ref(entity)
947 self.mapper = mapper
948 self.selectable = self.persist_selectable = self.local_table = (
949 selectable
950 )
951 self.name = name
952 self.polymorphic_on = polymorphic_on
953 self._base_alias = weakref.ref(_base_alias or self)
954 self._use_mapper_path = _use_mapper_path
955 self.represents_outer_join = represents_outer_join
956 self._nest_adapters = nest_adapters
957
958 if with_polymorphic_mappers:
959 self._is_with_polymorphic = True
960 self.with_polymorphic_mappers = with_polymorphic_mappers
961 self._with_polymorphic_entities = []
962 for poly in self.with_polymorphic_mappers:
963 if poly is not mapper:
964 ent = AliasedClass(
965 poly.class_,
966 selectable,
967 base_alias=self,
968 adapt_on_names=adapt_on_names,
969 use_mapper_path=_use_mapper_path,
970 )
971
972 setattr(self.entity, poly.class_.__name__, ent)
973 self._with_polymorphic_entities.append(ent._aliased_insp)
974
975 else:
976 self._is_with_polymorphic = False
977 self.with_polymorphic_mappers = [mapper]
978
979 self._adapter = ORMAdapter(
980 _TraceAdaptRole.ALIASED_INSP,
981 mapper,
982 selectable=selectable,
983 equivalents=mapper._equivalent_columns,
984 adapt_on_names=adapt_on_names,
985 anonymize_labels=True,
986 # make sure the adapter doesn't try to grab other tables that
987 # are not even the thing we are mapping, such as embedded
988 # selectables in subqueries or CTEs. See issue #6060
989 adapt_from_selectables={
990 m.selectable
991 for m in self.with_polymorphic_mappers
992 if not adapt_on_names
993 },
994 limit_on_entity=False,
995 )
996
997 if nest_adapters:
998 # supports "aliased class of aliased class" use case
999 assert isinstance(inspected, AliasedInsp)
1000 self._adapter = inspected._adapter.wrap(self._adapter)
1001
1002 self._adapt_on_names = adapt_on_names
1003 self._target = mapped_class_or_ac
1004
1005 @classmethod
1006 def _alias_factory(
1007 cls,
1008 element: Union[_EntityType[_O], FromClause],
1009 alias: Optional[FromClause] = None,
1010 name: Optional[str] = None,
1011 flat: bool = False,
1012 adapt_on_names: bool = False,
1013 ) -> Union[AliasedClass[_O], FromClause]:
1014 if isinstance(element, FromClause):
1015 if adapt_on_names:
1016 raise sa_exc.ArgumentError(
1017 "adapt_on_names only applies to ORM elements"
1018 )
1019 if name:
1020 return element.alias(name=name, flat=flat)
1021 else:
1022 return coercions.expect(
1023 roles.AnonymizedFromClauseRole, element, flat=flat
1024 )
1025 else:
1026 return AliasedClass(
1027 element,
1028 alias=alias,
1029 flat=flat,
1030 name=name,
1031 adapt_on_names=adapt_on_names,
1032 )
1033
1034 @classmethod
1035 def _with_polymorphic_factory(
1036 cls,
1037 base: Union[Type[_O], Mapper[_O]],
1038 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1039 selectable: Union[Literal[False, None], FromClause] = False,
1040 flat: bool = False,
1041 polymorphic_on: Optional[ColumnElement[Any]] = None,
1042 aliased: bool = False,
1043 innerjoin: bool = False,
1044 adapt_on_names: bool = False,
1045 name: Optional[str] = None,
1046 _use_mapper_path: bool = False,
1047 ) -> AliasedClass[_O]:
1048 primary_mapper = _class_to_mapper(base)
1049
1050 if selectable not in (None, False) and flat:
1051 raise sa_exc.ArgumentError(
1052 "the 'flat' and 'selectable' arguments cannot be passed "
1053 "simultaneously to with_polymorphic()"
1054 )
1055
1056 mappers, selectable = primary_mapper._with_polymorphic_args(
1057 classes, selectable, innerjoin=innerjoin
1058 )
1059 if aliased or flat:
1060 assert selectable is not None
1061 selectable = selectable._anonymous_fromclause(flat=flat)
1062
1063 return AliasedClass(
1064 base,
1065 selectable,
1066 name=name,
1067 with_polymorphic_mappers=mappers,
1068 adapt_on_names=adapt_on_names,
1069 with_polymorphic_discriminator=polymorphic_on,
1070 use_mapper_path=_use_mapper_path,
1071 represents_outer_join=not innerjoin,
1072 )
1073
1074 @property
1075 def entity(self) -> AliasedClass[_O]:
1076 # to eliminate reference cycles, the AliasedClass is held weakly.
1077 # this produces some situations where the AliasedClass gets lost,
1078 # particularly when one is created internally and only the AliasedInsp
1079 # is passed around.
1080 # to work around this case, we just generate a new one when we need
1081 # it, as it is a simple class with very little initial state on it.
1082 ent = self._weak_entity()
1083 if ent is None:
1084 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1085 self._weak_entity = weakref.ref(ent)
1086 return ent
1087
1088 is_aliased_class = True
1089 "always returns True"
1090
1091 def _memoized_method___clause_element__(self) -> FromClause:
1092 return self.selectable._annotate(
1093 {
1094 "parentmapper": self.mapper,
1095 "parententity": self,
1096 "entity_namespace": self,
1097 }
1098 )._set_propagate_attrs(
1099 {"compile_state_plugin": "orm", "plugin_subject": self}
1100 )
1101
1102 @property
1103 def entity_namespace(self) -> AliasedClass[_O]:
1104 return self.entity
1105
1106 @property
1107 def class_(self) -> Type[_O]:
1108 """Return the mapped class ultimately represented by this
1109 :class:`.AliasedInsp`."""
1110 return self.mapper.class_
1111
1112 @property
1113 def _path_registry(self) -> _AbstractEntityRegistry:
1114 if self._use_mapper_path:
1115 return self.mapper._path_registry
1116 else:
1117 return PathRegistry.per_mapper(self)
1118
1119 def __getstate__(self) -> Dict[str, Any]:
1120 return {
1121 "entity": self.entity,
1122 "mapper": self.mapper,
1123 "alias": self.selectable,
1124 "name": self.name,
1125 "adapt_on_names": self._adapt_on_names,
1126 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1127 "with_polymorphic_discriminator": self.polymorphic_on,
1128 "base_alias": self._base_alias(),
1129 "use_mapper_path": self._use_mapper_path,
1130 "represents_outer_join": self.represents_outer_join,
1131 "nest_adapters": self._nest_adapters,
1132 }
1133
1134 def __setstate__(self, state: Dict[str, Any]) -> None:
1135 self.__init__( # type: ignore
1136 state["entity"],
1137 state["mapper"],
1138 state["alias"],
1139 state["name"],
1140 state["with_polymorphic_mappers"],
1141 state["with_polymorphic_discriminator"],
1142 state["base_alias"],
1143 state["use_mapper_path"],
1144 state["adapt_on_names"],
1145 state["represents_outer_join"],
1146 state["nest_adapters"],
1147 )
1148
1149 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1150 # assert self._is_with_polymorphic
1151 # assert other._is_with_polymorphic
1152
1153 primary_mapper = other.mapper
1154
1155 assert self.mapper is primary_mapper
1156
1157 our_classes = util.to_set(
1158 mp.class_ for mp in self.with_polymorphic_mappers
1159 )
1160 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1161 if our_classes == new_classes:
1162 return other
1163 else:
1164 classes = our_classes.union(new_classes)
1165
1166 mappers, selectable = primary_mapper._with_polymorphic_args(
1167 classes, None, innerjoin=not other.represents_outer_join
1168 )
1169 selectable = selectable._anonymous_fromclause(flat=True)
1170 return AliasedClass(
1171 primary_mapper,
1172 selectable,
1173 with_polymorphic_mappers=mappers,
1174 with_polymorphic_discriminator=other.polymorphic_on,
1175 use_mapper_path=other._use_mapper_path,
1176 represents_outer_join=other.represents_outer_join,
1177 )._aliased_insp
1178
1179 def _adapt_element(
1180 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1181 ) -> _ORMCOLEXPR:
1182 assert isinstance(expr, ColumnElement)
1183 d: Dict[str, Any] = {
1184 "parententity": self,
1185 "parentmapper": self.mapper,
1186 }
1187 if key:
1188 d["proxy_key"] = key
1189
1190 # IMO mypy should see this one also as returning the same type
1191 # we put into it, but it's not
1192 return (
1193 self._adapter.traverse(expr)
1194 ._annotate(d)
1195 ._set_propagate_attrs(
1196 {"compile_state_plugin": "orm", "plugin_subject": self}
1197 )
1198 )
1199
1200 if TYPE_CHECKING:
1201 # establish compatibility with the _ORMAdapterProto protocol,
1202 # which in turn is compatible with _CoreAdapterProto.
1203
1204 def _orm_adapt_element(
1205 self,
1206 obj: _CE,
1207 key: Optional[str] = None,
1208 ) -> _CE: ...
1209
1210 else:
1211 _orm_adapt_element = _adapt_element
1212
1213 def _entity_for_mapper(self, mapper):
1214 self_poly = self.with_polymorphic_mappers
1215 if mapper in self_poly:
1216 if mapper is self.mapper:
1217 return self
1218 else:
1219 return getattr(
1220 self.entity, mapper.class_.__name__
1221 )._aliased_insp
1222 elif mapper.isa(self.mapper):
1223 return self
1224 else:
1225 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1226
1227 def _memoized_attr__get_clause(self):
1228 onclause, replacemap = self.mapper._get_clause
1229 return (
1230 self._adapter.traverse(onclause),
1231 {
1232 self._adapter.traverse(col): param
1233 for col, param in replacemap.items()
1234 },
1235 )
1236
1237 def _memoized_attr__memoized_values(self):
1238 return {}
1239
1240 def _memoized_attr__all_column_expressions(self):
1241 if self._is_with_polymorphic:
1242 cols_plus_keys = self.mapper._columns_plus_keys(
1243 [ent.mapper for ent in self._with_polymorphic_entities]
1244 )
1245 else:
1246 cols_plus_keys = self.mapper._columns_plus_keys()
1247
1248 cols_plus_keys = [
1249 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1250 ]
1251
1252 return ColumnCollection(cols_plus_keys)
1253
1254 def _memo(self, key, callable_, *args, **kw):
1255 if key in self._memoized_values:
1256 return self._memoized_values[key]
1257 else:
1258 self._memoized_values[key] = value = callable_(*args, **kw)
1259 return value
1260
1261 def __repr__(self):
1262 if self.with_polymorphic_mappers:
1263 with_poly = "(%s)" % ", ".join(
1264 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1265 )
1266 else:
1267 with_poly = ""
1268 return "<AliasedInsp at 0x%x; %s%s>" % (
1269 id(self),
1270 self.class_.__name__,
1271 with_poly,
1272 )
1273
1274 def __str__(self):
1275 if self._is_with_polymorphic:
1276 return "with_polymorphic(%s, [%s])" % (
1277 self._target.__name__,
1278 ", ".join(
1279 mp.class_.__name__
1280 for mp in self.with_polymorphic_mappers
1281 if mp is not self.mapper
1282 ),
1283 )
1284 else:
1285 return "aliased(%s)" % (self._target.__name__,)
1286
1287
1288class _WrapUserEntity:
1289 """A wrapper used within the loader_criteria lambda caller so that
1290 we can bypass declared_attr descriptors on unmapped mixins, which
1291 normally emit a warning for such use.
1292
1293 might also be useful for other per-lambda instrumentations should
1294 the need arise.
1295
1296 """
1297
1298 __slots__ = ("subject",)
1299
1300 def __init__(self, subject):
1301 self.subject = subject
1302
1303 @util.preload_module("sqlalchemy.orm.decl_api")
1304 def __getattribute__(self, name):
1305 decl_api = util.preloaded.orm.decl_api
1306
1307 subject = object.__getattribute__(self, "subject")
1308 if name in subject.__dict__ and isinstance(
1309 subject.__dict__[name], decl_api.declared_attr
1310 ):
1311 return subject.__dict__[name].fget(subject)
1312 else:
1313 return getattr(subject, name)
1314
1315
1316class LoaderCriteriaOption(CriteriaOption):
1317 """Add additional WHERE criteria to the load for all occurrences of
1318 a particular entity.
1319
1320 :class:`_orm.LoaderCriteriaOption` is invoked using the
1321 :func:`_orm.with_loader_criteria` function; see that function for
1322 details.
1323
1324 .. versionadded:: 1.4
1325
1326 """
1327
1328 __slots__ = (
1329 "root_entity",
1330 "entity",
1331 "deferred_where_criteria",
1332 "where_criteria",
1333 "_where_crit_orig",
1334 "include_aliases",
1335 "propagate_to_loaders",
1336 )
1337
1338 _traverse_internals = [
1339 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1340 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1341 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1342 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1343 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1344 ]
1345
1346 root_entity: Optional[Type[Any]]
1347 entity: Optional[_InternalEntityType[Any]]
1348 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1349 deferred_where_criteria: bool
1350 include_aliases: bool
1351 propagate_to_loaders: bool
1352
1353 _where_crit_orig: Any
1354
1355 def __init__(
1356 self,
1357 entity_or_base: _EntityType[Any],
1358 where_criteria: Union[
1359 _ColumnExpressionArgument[bool],
1360 Callable[[Any], _ColumnExpressionArgument[bool]],
1361 ],
1362 loader_only: bool = False,
1363 include_aliases: bool = False,
1364 propagate_to_loaders: bool = True,
1365 track_closure_variables: bool = True,
1366 ):
1367 entity = cast(
1368 "_InternalEntityType[Any]",
1369 inspection.inspect(entity_or_base, False),
1370 )
1371 if entity is None:
1372 self.root_entity = cast("Type[Any]", entity_or_base)
1373 self.entity = None
1374 else:
1375 self.root_entity = None
1376 self.entity = entity
1377
1378 self._where_crit_orig = where_criteria
1379 if callable(where_criteria):
1380 if self.root_entity is not None:
1381 wrap_entity = self.root_entity
1382 else:
1383 assert entity is not None
1384 wrap_entity = entity.entity
1385
1386 self.deferred_where_criteria = True
1387 self.where_criteria = lambdas.DeferredLambdaElement(
1388 where_criteria,
1389 roles.WhereHavingRole,
1390 lambda_args=(_WrapUserEntity(wrap_entity),),
1391 opts=lambdas.LambdaOptions(
1392 track_closure_variables=track_closure_variables
1393 ),
1394 )
1395 else:
1396 self.deferred_where_criteria = False
1397 self.where_criteria = coercions.expect(
1398 roles.WhereHavingRole, where_criteria
1399 )
1400
1401 self.include_aliases = include_aliases
1402 self.propagate_to_loaders = propagate_to_loaders
1403
1404 @classmethod
1405 def _unreduce(
1406 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1407 ):
1408 return LoaderCriteriaOption(
1409 entity,
1410 where_criteria,
1411 include_aliases=include_aliases,
1412 propagate_to_loaders=propagate_to_loaders,
1413 )
1414
1415 def __reduce__(self):
1416 return (
1417 LoaderCriteriaOption._unreduce,
1418 (
1419 self.entity.class_ if self.entity else self.root_entity,
1420 self._where_crit_orig,
1421 self.include_aliases,
1422 self.propagate_to_loaders,
1423 ),
1424 )
1425
1426 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1427 if self.entity:
1428 yield from self.entity.mapper.self_and_descendants
1429 else:
1430 assert self.root_entity
1431 stack = list(self.root_entity.__subclasses__())
1432 while stack:
1433 subclass = stack.pop(0)
1434 ent = cast(
1435 "_InternalEntityType[Any]",
1436 inspection.inspect(subclass, raiseerr=False),
1437 )
1438 if ent:
1439 yield from ent.mapper.self_and_descendants
1440 else:
1441 stack.extend(subclass.__subclasses__())
1442
1443 def _should_include(self, compile_state: _ORMCompileState) -> bool:
1444 if (
1445 compile_state.select_statement._annotations.get(
1446 "for_loader_criteria", None
1447 )
1448 is self
1449 ):
1450 return False
1451 return True
1452
1453 def _resolve_where_criteria(
1454 self, ext_info: _InternalEntityType[Any]
1455 ) -> ColumnElement[bool]:
1456 if self.deferred_where_criteria:
1457 crit = cast(
1458 "ColumnElement[bool]",
1459 self.where_criteria._resolve_with_args(ext_info.entity),
1460 )
1461 else:
1462 crit = self.where_criteria # type: ignore
1463 assert isinstance(crit, ColumnElement)
1464 return sql_util._deep_annotate(
1465 crit,
1466 {"for_loader_criteria": self},
1467 detect_subquery_cols=True,
1468 ind_cols_on_fromclause=True,
1469 )
1470
1471 def process_compile_state_replaced_entities(
1472 self,
1473 compile_state: _ORMCompileState,
1474 mapper_entities: Iterable[_MapperEntity],
1475 ) -> None:
1476 self.process_compile_state(compile_state)
1477
1478 def process_compile_state(self, compile_state: _ORMCompileState) -> None:
1479 """Apply a modification to a given :class:`.CompileState`."""
1480
1481 # if options to limit the criteria to immediate query only,
1482 # use compile_state.attributes instead
1483
1484 self.get_global_criteria(compile_state.global_attributes)
1485
1486 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1487 for mp in self._all_mappers():
1488 load_criteria = attributes.setdefault(
1489 ("additional_entity_criteria", mp), []
1490 )
1491
1492 load_criteria.append(self)
1493
1494
1495inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1496
1497
1498@inspection._inspects(type)
1499def _inspect_mc(
1500 class_: Type[_O],
1501) -> Optional[Mapper[_O]]:
1502 try:
1503 class_manager = opt_manager_of_class(class_)
1504 if class_manager is None or not class_manager.is_mapped:
1505 return None
1506 mapper = class_manager.mapper
1507 except exc.NO_STATE:
1508 return None
1509 else:
1510 return mapper
1511
1512
1513GenericAlias = type(List[Any])
1514
1515
1516@inspection._inspects(GenericAlias)
1517def _inspect_generic_alias(
1518 class_: Type[_O],
1519) -> Optional[Mapper[_O]]:
1520 origin = cast("Type[_O]", get_origin(class_))
1521 return _inspect_mc(origin)
1522
1523
1524@inspection._self_inspects
1525class Bundle(
1526 ORMColumnsClauseRole[_T],
1527 SupportsCloneAnnotations,
1528 MemoizedHasCacheKey,
1529 inspection.Inspectable["Bundle[_T]"],
1530 InspectionAttr,
1531):
1532 """A grouping of SQL expressions that are returned by a :class:`.Query`
1533 under one namespace.
1534
1535 The :class:`.Bundle` essentially allows nesting of the tuple-based
1536 results returned by a column-oriented :class:`_query.Query` object.
1537 It also
1538 is extensible via simple subclassing, where the primary capability
1539 to override is that of how the set of expressions should be returned,
1540 allowing post-processing as well as custom return types, without
1541 involving ORM identity-mapped classes.
1542
1543 .. seealso::
1544
1545 :ref:`bundles`
1546
1547
1548 """
1549
1550 single_entity = False
1551 """If True, queries for a single Bundle will be returned as a single
1552 entity, rather than an element within a keyed tuple."""
1553
1554 is_clause_element = False
1555
1556 is_mapper = False
1557
1558 is_aliased_class = False
1559
1560 is_bundle = True
1561
1562 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1563
1564 proxy_set = util.EMPTY_SET
1565
1566 exprs: List[_ColumnsClauseElement]
1567
1568 def __init__(
1569 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1570 ):
1571 r"""Construct a new :class:`.Bundle`.
1572
1573 e.g.::
1574
1575 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1576
1577 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1578 print(row.mybundle.x, row.mybundle.y)
1579
1580 :param name: name of the bundle.
1581 :param \*exprs: columns or SQL expressions comprising the bundle.
1582 :param single_entity=False: if True, rows for this :class:`.Bundle`
1583 can be returned as a "single entity" outside of any enclosing tuple
1584 in the same manner as a mapped entity.
1585
1586 """ # noqa: E501
1587 self.name = self._label = name
1588 coerced_exprs = [
1589 coercions.expect(
1590 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1591 )
1592 for expr in exprs
1593 ]
1594 self.exprs = coerced_exprs
1595
1596 self.c = self.columns = ColumnCollection(
1597 (getattr(col, "key", col._label), col)
1598 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1599 ).as_readonly()
1600 self.single_entity = kw.pop("single_entity", self.single_entity)
1601
1602 def _gen_cache_key(
1603 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1604 ) -> Tuple[Any, ...]:
1605 return (self.__class__, self.name, self.single_entity) + tuple(
1606 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1607 )
1608
1609 @property
1610 def mapper(self) -> Optional[Mapper[Any]]:
1611 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1612 "parentmapper", None
1613 )
1614 return mp
1615
1616 @property
1617 def entity(self) -> Optional[_InternalEntityType[Any]]:
1618 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1619 0
1620 ]._annotations.get("parententity", None)
1621 return ie
1622
1623 @property
1624 def entity_namespace(
1625 self,
1626 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1627 return self.c
1628
1629 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1630
1631 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1632
1633 e.g.::
1634
1635 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1636
1637 q = sess.query(bn).filter(bn.c.x == 5)
1638
1639 Nesting of bundles is also supported::
1640
1641 b1 = Bundle(
1642 "b1",
1643 Bundle("b2", MyClass.a, MyClass.b),
1644 Bundle("b3", MyClass.x, MyClass.y),
1645 )
1646
1647 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1648
1649 .. seealso::
1650
1651 :attr:`.Bundle.c`
1652
1653 """ # noqa: E501
1654
1655 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1656 """An alias for :attr:`.Bundle.columns`."""
1657
1658 def _clone(self, **kw):
1659 cloned = self.__class__.__new__(self.__class__)
1660 cloned.__dict__.update(self.__dict__)
1661 return cloned
1662
1663 def __clause_element__(self):
1664 # ensure existing entity_namespace remains
1665 annotations = {"bundle": self, "entity_namespace": self}
1666 annotations.update(self._annotations)
1667
1668 plugin_subject = self.exprs[0]._propagate_attrs.get(
1669 "plugin_subject", self.entity
1670 )
1671 return (
1672 expression.ClauseList(
1673 _literal_as_text_role=roles.ColumnsClauseRole,
1674 group=False,
1675 *[e._annotations.get("bundle", e) for e in self.exprs],
1676 )
1677 ._annotate(annotations)
1678 ._set_propagate_attrs(
1679 # the Bundle *must* use the orm plugin no matter what. the
1680 # subject can be None but it's much better if it's not.
1681 {
1682 "compile_state_plugin": "orm",
1683 "plugin_subject": plugin_subject,
1684 }
1685 )
1686 )
1687
1688 @property
1689 def clauses(self):
1690 return self.__clause_element__().clauses
1691
1692 def label(self, name):
1693 """Provide a copy of this :class:`.Bundle` passing a new label."""
1694
1695 cloned = self._clone()
1696 cloned.name = name
1697 return cloned
1698
1699 def create_row_processor(
1700 self,
1701 query: Select[Unpack[TupleAny]],
1702 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1703 labels: Sequence[str],
1704 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1705 """Produce the "row processing" function for this :class:`.Bundle`.
1706
1707 May be overridden by subclasses to provide custom behaviors when
1708 results are fetched. The method is passed the statement object and a
1709 set of "row processor" functions at query execution time; these
1710 processor functions when given a result row will return the individual
1711 attribute value, which can then be adapted into any kind of return data
1712 structure.
1713
1714 The example below illustrates replacing the usual :class:`.Row`
1715 return structure with a straight Python dictionary::
1716
1717 from sqlalchemy.orm import Bundle
1718
1719
1720 class DictBundle(Bundle):
1721 def create_row_processor(self, query, procs, labels):
1722 "Override create_row_processor to return values as dictionaries"
1723
1724 def proc(row):
1725 return dict(zip(labels, (proc(row) for proc in procs)))
1726
1727 return proc
1728
1729 A result from the above :class:`_orm.Bundle` will return dictionary
1730 values::
1731
1732 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1733 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1734 print(row.mybundle["data1"], row.mybundle["data2"])
1735
1736 """ # noqa: E501
1737 keyed_tuple = result_tuple(labels, [() for l in labels])
1738
1739 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1740 return keyed_tuple([proc(row) for proc in procs])
1741
1742 return proc
1743
1744
1745def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1746 """Deep copy the given ClauseElement, annotating each element with the
1747 "_orm_adapt" flag.
1748
1749 Elements within the exclude collection will be cloned but not annotated.
1750
1751 """
1752 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1753
1754
1755def _orm_deannotate(element: _SA) -> _SA:
1756 """Remove annotations that link a column to a particular mapping.
1757
1758 Note this doesn't affect "remote" and "foreign" annotations
1759 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1760 annotators.
1761
1762 """
1763
1764 return sql_util._deep_deannotate(
1765 element, values=("_orm_adapt", "parententity")
1766 )
1767
1768
1769def _orm_full_deannotate(element: _SA) -> _SA:
1770 return sql_util._deep_deannotate(element)
1771
1772
1773class _ORMJoin(expression.Join):
1774 """Extend Join to support ORM constructs as input."""
1775
1776 __visit_name__ = expression.Join.__visit_name__
1777
1778 inherit_cache = True
1779
1780 def __init__(
1781 self,
1782 left: _FromClauseArgument,
1783 right: _FromClauseArgument,
1784 onclause: Optional[_OnClauseArgument] = None,
1785 isouter: bool = False,
1786 full: bool = False,
1787 _left_memo: Optional[Any] = None,
1788 _right_memo: Optional[Any] = None,
1789 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1790 ):
1791 left_info = cast(
1792 "Union[FromClause, _InternalEntityType[Any]]",
1793 inspection.inspect(left),
1794 )
1795
1796 right_info = cast(
1797 "Union[FromClause, _InternalEntityType[Any]]",
1798 inspection.inspect(right),
1799 )
1800 adapt_to = right_info.selectable
1801
1802 # used by joined eager loader
1803 self._left_memo = _left_memo
1804 self._right_memo = _right_memo
1805
1806 if isinstance(onclause, attributes.QueryableAttribute):
1807 if TYPE_CHECKING:
1808 assert isinstance(
1809 onclause.comparator, RelationshipProperty.Comparator
1810 )
1811 on_selectable = onclause.comparator._source_selectable()
1812 prop = onclause.property
1813 _extra_criteria += onclause._extra_criteria
1814 elif isinstance(onclause, MapperProperty):
1815 # used internally by joined eager loader...possibly not ideal
1816 prop = onclause
1817 on_selectable = prop.parent.selectable
1818 else:
1819 prop = None
1820 on_selectable = None
1821
1822 left_selectable = left_info.selectable
1823 if prop:
1824 adapt_from: Optional[FromClause]
1825 if sql_util.clause_is_present(on_selectable, left_selectable):
1826 adapt_from = on_selectable
1827 else:
1828 assert isinstance(left_selectable, FromClause)
1829 adapt_from = left_selectable
1830
1831 (
1832 pj,
1833 sj,
1834 source,
1835 dest,
1836 secondary,
1837 target_adapter,
1838 ) = prop._create_joins(
1839 source_selectable=adapt_from,
1840 dest_selectable=adapt_to,
1841 source_polymorphic=True,
1842 of_type_entity=right_info,
1843 alias_secondary=True,
1844 extra_criteria=_extra_criteria,
1845 )
1846
1847 if sj is not None:
1848 if isouter:
1849 # note this is an inner join from secondary->right
1850 right = sql.join(secondary, right, sj)
1851 onclause = pj
1852 else:
1853 left = sql.join(left, secondary, pj, isouter)
1854 onclause = sj
1855 else:
1856 onclause = pj
1857
1858 self._target_adapter = target_adapter
1859
1860 # we don't use the normal coercions logic for _ORMJoin
1861 # (probably should), so do some gymnastics to get the entity.
1862 # logic here is for #8721, which was a major bug in 1.4
1863 # for almost two years, not reported/fixed until 1.4.43 (!)
1864 if is_selectable(left_info):
1865 parententity = left_selectable._annotations.get(
1866 "parententity", None
1867 )
1868 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1869 parententity = left_info
1870 else:
1871 parententity = None
1872
1873 if parententity is not None:
1874 self._annotations = self._annotations.union(
1875 {"parententity": parententity}
1876 )
1877
1878 augment_onclause = bool(_extra_criteria) and not prop
1879 expression.Join.__init__(self, left, right, onclause, isouter, full)
1880
1881 assert self.onclause is not None
1882
1883 if augment_onclause:
1884 self.onclause &= sql.and_(*_extra_criteria)
1885
1886 if (
1887 not prop
1888 and getattr(right_info, "mapper", None)
1889 and right_info.mapper.single # type: ignore
1890 ):
1891 right_info = cast("_InternalEntityType[Any]", right_info)
1892 # if single inheritance target and we are using a manual
1893 # or implicit ON clause, augment it the same way we'd augment the
1894 # WHERE.
1895 single_crit = right_info.mapper._single_table_criterion
1896 if single_crit is not None:
1897 if insp_is_aliased_class(right_info):
1898 single_crit = right_info._adapter.traverse(single_crit)
1899 self.onclause = self.onclause & single_crit
1900
1901 def _splice_into_center(self, other):
1902 """Splice a join into the center.
1903
1904 Given join(a, b) and join(b, c), return join(a, b).join(c)
1905
1906 """
1907 leftmost = other
1908 while isinstance(leftmost, sql.Join):
1909 leftmost = leftmost.left
1910
1911 assert self.right is leftmost
1912
1913 left = _ORMJoin(
1914 self.left,
1915 other.left,
1916 self.onclause,
1917 isouter=self.isouter,
1918 _left_memo=self._left_memo,
1919 _right_memo=other._left_memo._path_registry,
1920 )
1921
1922 return _ORMJoin(
1923 left,
1924 other.right,
1925 other.onclause,
1926 isouter=other.isouter,
1927 _right_memo=other._right_memo,
1928 )
1929
1930 def join(
1931 self,
1932 right: _FromClauseArgument,
1933 onclause: Optional[_OnClauseArgument] = None,
1934 isouter: bool = False,
1935 full: bool = False,
1936 ) -> _ORMJoin:
1937 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1938
1939 def outerjoin(
1940 self,
1941 right: _FromClauseArgument,
1942 onclause: Optional[_OnClauseArgument] = None,
1943 full: bool = False,
1944 ) -> _ORMJoin:
1945 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1946
1947
1948def with_parent(
1949 instance: object,
1950 prop: attributes.QueryableAttribute[Any],
1951 from_entity: Optional[_EntityType[Any]] = None,
1952) -> ColumnElement[bool]:
1953 """Create filtering criterion that relates this query's primary entity
1954 to the given related instance, using established
1955 :func:`_orm.relationship()`
1956 configuration.
1957
1958 E.g.::
1959
1960 stmt = select(Address).where(with_parent(some_user, User.addresses))
1961
1962 The SQL rendered is the same as that rendered when a lazy loader
1963 would fire off from the given parent on that attribute, meaning
1964 that the appropriate state is taken from the parent object in
1965 Python without the need to render joins to the parent table
1966 in the rendered statement.
1967
1968 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1969 to indicate the left side of the criteria::
1970
1971
1972 a1 = aliased(Address)
1973 a2 = aliased(Address)
1974 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
1975
1976 The above use is equivalent to using the
1977 :func:`_orm.with_parent.from_entity` argument::
1978
1979 a1 = aliased(Address)
1980 a2 = aliased(Address)
1981 stmt = select(a1, a2).where(
1982 with_parent(u1, User.addresses, from_entity=a2)
1983 )
1984
1985 :param instance:
1986 An instance which has some :func:`_orm.relationship`.
1987
1988 :param property:
1989 Class-bound attribute, which indicates
1990 what relationship from the instance should be used to reconcile the
1991 parent/child relationship.
1992
1993 :param from_entity:
1994 Entity in which to consider as the left side. This defaults to the
1995 "zero" entity of the :class:`_query.Query` itself.
1996
1997 """ # noqa: E501
1998 prop_t: RelationshipProperty[Any]
1999
2000 if isinstance(prop, str):
2001 raise sa_exc.ArgumentError(
2002 "with_parent() accepts class-bound mapped attributes, not strings"
2003 )
2004 elif isinstance(prop, attributes.QueryableAttribute):
2005 if prop._of_type:
2006 from_entity = prop._of_type
2007 mapper_property = prop.property
2008 if mapper_property is None or not prop_is_relationship(
2009 mapper_property
2010 ):
2011 raise sa_exc.ArgumentError(
2012 f"Expected relationship property for with_parent(), "
2013 f"got {mapper_property}"
2014 )
2015 prop_t = mapper_property
2016 else:
2017 prop_t = prop
2018
2019 return prop_t._with_parent(instance, from_entity=from_entity)
2020
2021
2022def has_identity(object_: object) -> bool:
2023 """Return True if the given object has a database
2024 identity.
2025
2026 This typically corresponds to the object being
2027 in either the persistent or detached state.
2028
2029 .. seealso::
2030
2031 :func:`.was_deleted`
2032
2033 """
2034 state = attributes.instance_state(object_)
2035 return state.has_identity
2036
2037
2038def was_deleted(object_: object) -> bool:
2039 """Return True if the given object was deleted
2040 within a session flush.
2041
2042 This is regardless of whether or not the object is
2043 persistent or detached.
2044
2045 .. seealso::
2046
2047 :attr:`.InstanceState.was_deleted`
2048
2049 """
2050
2051 state = attributes.instance_state(object_)
2052 return state.was_deleted
2053
2054
2055def _entity_corresponds_to(
2056 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2057) -> bool:
2058 """determine if 'given' corresponds to 'entity', in terms
2059 of an entity passed to Query that would match the same entity
2060 being referred to elsewhere in the query.
2061
2062 """
2063 if insp_is_aliased_class(entity):
2064 if insp_is_aliased_class(given):
2065 if entity._base_alias() is given._base_alias():
2066 return True
2067 return False
2068 elif insp_is_aliased_class(given):
2069 if given._use_mapper_path:
2070 return entity in given.with_polymorphic_mappers
2071 else:
2072 return entity is given
2073
2074 assert insp_is_mapper(given)
2075 return entity.common_parent(given)
2076
2077
2078def _entity_corresponds_to_use_path_impl(
2079 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2080) -> bool:
2081 """determine if 'given' corresponds to 'entity', in terms
2082 of a path of loader options where a mapped attribute is taken to
2083 be a member of a parent entity.
2084
2085 e.g.::
2086
2087 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2088 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2089
2090 a1 = aliased(A)
2091 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2092 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2093
2094 wp = with_polymorphic(A, [A1, A2])
2095 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2096 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2097
2098 """
2099 if insp_is_aliased_class(given):
2100 return (
2101 insp_is_aliased_class(entity)
2102 and not entity._use_mapper_path
2103 and (given is entity or entity in given._with_polymorphic_entities)
2104 )
2105 elif not insp_is_aliased_class(entity):
2106 return given.isa(entity.mapper)
2107 else:
2108 return (
2109 entity._use_mapper_path
2110 and given in entity.with_polymorphic_mappers
2111 )
2112
2113
2114def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2115 """determine if 'given' "is a" mapper, in terms of the given
2116 would load rows of type 'mapper'.
2117
2118 """
2119 if given.is_aliased_class:
2120 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2121 mapper
2122 )
2123 elif given.with_polymorphic_mappers:
2124 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2125 else:
2126 return given.isa(mapper)
2127
2128
2129def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2130 """calculate __getitem__ in terms of an iterable query object
2131 that also has a slice() method.
2132
2133 """
2134
2135 def _no_negative_indexes():
2136 raise IndexError(
2137 "negative indexes are not accepted by SQL "
2138 "index / slice operators"
2139 )
2140
2141 if isinstance(item, slice):
2142 start, stop, step = util.decode_slice(item)
2143
2144 if (
2145 isinstance(stop, int)
2146 and isinstance(start, int)
2147 and stop - start <= 0
2148 ):
2149 return []
2150
2151 elif (isinstance(start, int) and start < 0) or (
2152 isinstance(stop, int) and stop < 0
2153 ):
2154 _no_negative_indexes()
2155
2156 res = iterable_query.slice(start, stop)
2157 if step is not None:
2158 return list(res)[None : None : item.step]
2159 else:
2160 return list(res)
2161 else:
2162 if item == -1:
2163 _no_negative_indexes()
2164 else:
2165 return list(iterable_query[item : item + 1])[0]
2166
2167
2168def _is_mapped_annotation(
2169 raw_annotation: _AnnotationScanType,
2170 cls: Type[Any],
2171 originating_cls: Type[Any],
2172) -> bool:
2173 try:
2174 annotated = de_stringify_annotation(
2175 cls, raw_annotation, originating_cls.__module__
2176 )
2177 except NameError:
2178 # in most cases, at least within our own tests, we can raise
2179 # here, which is more accurate as it prevents us from returning
2180 # false negatives. However, in the real world, try to avoid getting
2181 # involved with end-user annotations that have nothing to do with us.
2182 # see issue #8888 where we bypass using this function in the case
2183 # that we want to detect an unresolvable Mapped[] type.
2184 return False
2185 else:
2186 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2187
2188
2189class _CleanupError(Exception):
2190 pass
2191
2192
2193def _cleanup_mapped_str_annotation(
2194 annotation: str, originating_module: str
2195) -> str:
2196 # fix up an annotation that comes in as the form:
2197 # 'Mapped[List[Address]]' so that it instead looks like:
2198 # 'Mapped[List["Address"]]' , which will allow us to get
2199 # "Address" as a string
2200
2201 # additionally, resolve symbols for these names since this is where
2202 # we'd have to do it
2203
2204 inner: Optional[Match[str]]
2205
2206 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2207
2208 if not mm:
2209 return annotation
2210
2211 # ticket #8759. Resolve the Mapped name to a real symbol.
2212 # originally this just checked the name.
2213 try:
2214 obj = eval_name_only(mm.group(1), originating_module)
2215 except NameError as ne:
2216 raise _CleanupError(
2217 f'For annotation "{annotation}", could not resolve '
2218 f'container type "{mm.group(1)}". '
2219 "Please ensure this type is imported at the module level "
2220 "outside of TYPE_CHECKING blocks"
2221 ) from ne
2222
2223 if obj is typing.ClassVar:
2224 real_symbol = "ClassVar"
2225 else:
2226 try:
2227 if issubclass(obj, _MappedAnnotationBase):
2228 real_symbol = obj.__name__
2229 else:
2230 return annotation
2231 except TypeError:
2232 # avoid isinstance(obj, type) check, just catch TypeError
2233 return annotation
2234
2235 # note: if one of the codepaths above didn't define real_symbol and
2236 # then didn't return, real_symbol raises UnboundLocalError
2237 # which is actually a NameError, and the calling routines don't
2238 # notice this since they are catching NameError anyway. Just in case
2239 # this is being modified in the future, something to be aware of.
2240
2241 stack = []
2242 inner = mm
2243 while True:
2244 stack.append(real_symbol if mm is inner else inner.group(1))
2245 g2 = inner.group(2)
2246 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2247 if inner is None:
2248 stack.append(g2)
2249 break
2250
2251 # stacks we want to rewrite, that is, quote the last entry which
2252 # we think is a relationship class name:
2253 #
2254 # ['Mapped', 'List', 'Address']
2255 # ['Mapped', 'A']
2256 #
2257 # stacks we dont want to rewrite, which are generally MappedColumn
2258 # use cases:
2259 #
2260 # ['Mapped', "'Optional[Dict[str, str]]'"]
2261 # ['Mapped', 'dict[str, str] | None']
2262
2263 if (
2264 # avoid already quoted symbols such as
2265 # ['Mapped', "'Optional[Dict[str, str]]'"]
2266 not re.match(r"""^["'].*["']$""", stack[-1])
2267 # avoid further generics like Dict[] such as
2268 # ['Mapped', 'dict[str, str] | None'],
2269 # ['Mapped', 'list[int] | list[str]'],
2270 # ['Mapped', 'Union[list[int], list[str]]'],
2271 and not re.search(r"[\[\]]", stack[-1])
2272 ):
2273 stripchars = "\"' "
2274 stack[-1] = ", ".join(
2275 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2276 )
2277
2278 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2279
2280 return annotation
2281
2282
2283def _extract_mapped_subtype(
2284 raw_annotation: Optional[_AnnotationScanType],
2285 cls: type,
2286 originating_module: str,
2287 key: str,
2288 attr_cls: Type[Any],
2289 required: bool,
2290 is_dataclass_field: bool,
2291 expect_mapped: bool = True,
2292 raiseerr: bool = True,
2293) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2294 """given an annotation, figure out if it's ``Mapped[something]`` and if
2295 so, return the ``something`` part.
2296
2297 Includes error raise scenarios and other options.
2298
2299 """
2300
2301 if raw_annotation is None:
2302 if required:
2303 raise orm_exc.MappedAnnotationError(
2304 f"Python typing annotation is required for attribute "
2305 f'"{cls.__name__}.{key}" when primary argument(s) for '
2306 f'"{attr_cls.__name__}" construct are None or not present'
2307 )
2308 return None
2309
2310 try:
2311 # destringify the "outside" of the annotation. note we are not
2312 # adding include_generic so it will *not* dig into generic contents,
2313 # which will remain as ForwardRef or plain str under future annotations
2314 # mode. The full destringify happens later when mapped_column goes
2315 # to do a full lookup in the registry type_annotations_map.
2316 annotated = de_stringify_annotation(
2317 cls,
2318 raw_annotation,
2319 originating_module,
2320 str_cleanup_fn=_cleanup_mapped_str_annotation,
2321 )
2322 except _CleanupError as ce:
2323 raise orm_exc.MappedAnnotationError(
2324 f"Could not interpret annotation {raw_annotation}. "
2325 "Check that it uses names that are correctly imported at the "
2326 "module level. See chained stack trace for more hints."
2327 ) from ce
2328 except NameError as ne:
2329 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2330 raise orm_exc.MappedAnnotationError(
2331 f"Could not interpret annotation {raw_annotation}. "
2332 "Check that it uses names that are correctly imported at the "
2333 "module level. See chained stack trace for more hints."
2334 ) from ne
2335
2336 annotated = raw_annotation # type: ignore
2337
2338 if is_dataclass_field:
2339 return annotated, None
2340 else:
2341 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2342 annotated, _MappedAnnotationBase
2343 ):
2344 if expect_mapped:
2345 if not raiseerr:
2346 return None
2347
2348 origin = getattr(annotated, "__origin__", None)
2349 if origin is typing.ClassVar:
2350 return None
2351
2352 # check for other kind of ORM descriptor like AssociationProxy,
2353 # don't raise for that (issue #9957)
2354 elif isinstance(origin, type) and issubclass(
2355 origin, ORMDescriptor
2356 ):
2357 return None
2358
2359 raise orm_exc.MappedAnnotationError(
2360 f'Type annotation for "{cls.__name__}.{key}" '
2361 "can't be correctly interpreted for "
2362 "Annotated Declarative Table form. ORM annotations "
2363 "should normally make use of the ``Mapped[]`` generic "
2364 "type, or other ORM-compatible generic type, as a "
2365 "container for the actual type, which indicates the "
2366 "intent that the attribute is mapped. "
2367 "Class variables that are not intended to be mapped "
2368 "by the ORM should use ClassVar[]. "
2369 "To allow Annotated Declarative to disregard legacy "
2370 "annotations which don't use Mapped[] to pass, set "
2371 '"__allow_unmapped__ = True" on the class or a '
2372 "superclass this class.",
2373 code="zlpr",
2374 )
2375
2376 else:
2377 return annotated, None
2378
2379 if len(annotated.__args__) != 1:
2380 raise orm_exc.MappedAnnotationError(
2381 "Expected sub-type for Mapped[] annotation"
2382 )
2383
2384 return (
2385 # fix dict/list/set args to be ForwardRef, see #11814
2386 fixup_container_fwd_refs(annotated.__args__[0]),
2387 annotated.__origin__,
2388 )
2389
2390
2391def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2392 if hasattr(prop, "_mapper_property_name"):
2393 name = prop._mapper_property_name()
2394 else:
2395 name = None
2396 return util.clsname_as_plain_name(prop, name)