1# orm/util.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Sequence
29from typing import Tuple
30from typing import Type
31from typing import TYPE_CHECKING
32from typing import TypeVar
33from typing import Union
34import weakref
35
36from . import attributes # noqa
37from . import exc
38from . import exc as orm_exc
39from ._typing import _O
40from ._typing import insp_is_aliased_class
41from ._typing import insp_is_mapper
42from ._typing import prop_is_relationship
43from .base import _class_to_mapper as _class_to_mapper
44from .base import _MappedAnnotationBase
45from .base import _never_set as _never_set # noqa: F401
46from .base import _none_only_set as _none_only_set # noqa: F401
47from .base import _none_set as _none_set # noqa: F401
48from .base import attribute_str as attribute_str # noqa: F401
49from .base import class_mapper as class_mapper
50from .base import DynamicMapped
51from .base import InspectionAttr as InspectionAttr
52from .base import instance_str as instance_str # noqa: F401
53from .base import Mapped
54from .base import object_mapper as object_mapper
55from .base import object_state as object_state # noqa: F401
56from .base import opt_manager_of_class
57from .base import ORMDescriptor
58from .base import state_attribute_str as state_attribute_str # noqa: F401
59from .base import state_class_str as state_class_str # noqa: F401
60from .base import state_str as state_str # noqa: F401
61from .base import WriteOnlyMapped
62from .interfaces import CriteriaOption
63from .interfaces import MapperProperty as MapperProperty
64from .interfaces import ORMColumnsClauseRole
65from .interfaces import ORMEntityColumnsClauseRole
66from .interfaces import ORMFromClauseRole
67from .path_registry import PathRegistry as PathRegistry
68from .. import event
69from .. import exc as sa_exc
70from .. import inspection
71from .. import sql
72from .. import util
73from ..engine.result import result_tuple
74from ..sql import coercions
75from ..sql import expression
76from ..sql import lambdas
77from ..sql import roles
78from ..sql import util as sql_util
79from ..sql import visitors
80from ..sql._typing import is_selectable
81from ..sql.annotation import SupportsCloneAnnotations
82from ..sql.base import ColumnCollection
83from ..sql.cache_key import HasCacheKey
84from ..sql.cache_key import MemoizedHasCacheKey
85from ..sql.elements import ColumnElement
86from ..sql.elements import KeyedColumnElement
87from ..sql.selectable import FromClause
88from ..util.langhelpers import MemoizedSlots
89from ..util.typing import de_stringify_annotation as _de_stringify_annotation
90from ..util.typing import eval_name_only as _eval_name_only
91from ..util.typing import fixup_container_fwd_refs
92from ..util.typing import get_origin
93from ..util.typing import is_origin_of_cls
94from ..util.typing import Literal
95from ..util.typing import Protocol
96
97if typing.TYPE_CHECKING:
98 from ._typing import _EntityType
99 from ._typing import _IdentityKeyType
100 from ._typing import _InternalEntityType
101 from ._typing import _ORMCOLEXPR
102 from .context import _MapperEntity
103 from .context import ORMCompileState
104 from .mapper import Mapper
105 from .path_registry import AbstractEntityRegistry
106 from .query import Query
107 from .relationships import RelationshipProperty
108 from ..engine import Row
109 from ..engine import RowMapping
110 from ..sql._typing import _CE
111 from ..sql._typing import _ColumnExpressionArgument
112 from ..sql._typing import _EquivalentColumnMap
113 from ..sql._typing import _FromClauseArgument
114 from ..sql._typing import _OnClauseArgument
115 from ..sql._typing import _PropagateAttrsType
116 from ..sql.annotation import _SA
117 from ..sql.base import ReadOnlyColumnCollection
118 from ..sql.elements import BindParameter
119 from ..sql.selectable import _ColumnsClauseElement
120 from ..sql.selectable import Select
121 from ..sql.selectable import Selectable
122 from ..sql.visitors import anon_map
123 from ..util.typing import _AnnotationScanType
124
125_T = TypeVar("_T", bound=Any)
126
127all_cascades = frozenset(
128 (
129 "delete",
130 "delete-orphan",
131 "all",
132 "merge",
133 "expunge",
134 "save-update",
135 "refresh-expire",
136 "none",
137 )
138)
139
140_de_stringify_partial = functools.partial(
141 functools.partial,
142 locals_=util.immutabledict(
143 {
144 "Mapped": Mapped,
145 "WriteOnlyMapped": WriteOnlyMapped,
146 "DynamicMapped": DynamicMapped,
147 }
148 ),
149)
150
151# partial is practically useless as we have to write out the whole
152# function and maintain the signature anyway
153
154
155class _DeStringifyAnnotation(Protocol):
156 def __call__(
157 self,
158 cls: Type[Any],
159 annotation: _AnnotationScanType,
160 originating_module: str,
161 *,
162 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
163 include_generic: bool = False,
164 ) -> Type[Any]: ...
165
166
167de_stringify_annotation = cast(
168 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
169)
170
171
172class _EvalNameOnly(Protocol):
173 def __call__(self, name: str, module_name: str) -> Any: ...
174
175
176eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
177
178
179class CascadeOptions(FrozenSet[str]):
180 """Keeps track of the options sent to
181 :paramref:`.relationship.cascade`"""
182
183 _add_w_all_cascades = all_cascades.difference(
184 ["all", "none", "delete-orphan"]
185 )
186 _allowed_cascades = all_cascades
187
188 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
189
190 __slots__ = (
191 "save_update",
192 "delete",
193 "refresh_expire",
194 "merge",
195 "expunge",
196 "delete_orphan",
197 )
198
199 save_update: bool
200 delete: bool
201 refresh_expire: bool
202 merge: bool
203 expunge: bool
204 delete_orphan: bool
205
206 def __new__(
207 cls, value_list: Optional[Union[Iterable[str], str]]
208 ) -> CascadeOptions:
209 if isinstance(value_list, str) or value_list is None:
210 return cls.from_string(value_list) # type: ignore
211 values = set(value_list)
212 if values.difference(cls._allowed_cascades):
213 raise sa_exc.ArgumentError(
214 "Invalid cascade option(s): %s"
215 % ", ".join(
216 [
217 repr(x)
218 for x in sorted(
219 values.difference(cls._allowed_cascades)
220 )
221 ]
222 )
223 )
224
225 if "all" in values:
226 values.update(cls._add_w_all_cascades)
227 if "none" in values:
228 values.clear()
229 values.discard("all")
230
231 self = super().__new__(cls, values)
232 self.save_update = "save-update" in values
233 self.delete = "delete" in values
234 self.refresh_expire = "refresh-expire" in values
235 self.merge = "merge" in values
236 self.expunge = "expunge" in values
237 self.delete_orphan = "delete-orphan" in values
238
239 if self.delete_orphan and not self.delete:
240 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
241 return self
242
243 def __repr__(self):
244 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
245
246 @classmethod
247 def from_string(cls, arg):
248 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
249 return cls(values)
250
251
252def _validator_events(desc, key, validator, include_removes, include_backrefs):
253 """Runs a validation method on an attribute value to be set or
254 appended.
255 """
256
257 if not include_backrefs:
258
259 def detect_is_backref(state, initiator):
260 impl = state.manager[key].impl
261 return initiator.impl is not impl
262
263 if include_removes:
264
265 def append(state, value, initiator):
266 if initiator.op is not attributes.OP_BULK_REPLACE and (
267 include_backrefs or not detect_is_backref(state, initiator)
268 ):
269 return validator(state.obj(), key, value, False)
270 else:
271 return value
272
273 def bulk_set(state, values, initiator):
274 if include_backrefs or not detect_is_backref(state, initiator):
275 obj = state.obj()
276 values[:] = [
277 validator(obj, key, value, False) for value in values
278 ]
279
280 def set_(state, value, oldvalue, initiator):
281 if include_backrefs or not detect_is_backref(state, initiator):
282 return validator(state.obj(), key, value, False)
283 else:
284 return value
285
286 def remove(state, value, initiator):
287 if include_backrefs or not detect_is_backref(state, initiator):
288 validator(state.obj(), key, value, True)
289
290 else:
291
292 def append(state, value, initiator):
293 if initiator.op is not attributes.OP_BULK_REPLACE and (
294 include_backrefs or not detect_is_backref(state, initiator)
295 ):
296 return validator(state.obj(), key, value)
297 else:
298 return value
299
300 def bulk_set(state, values, initiator):
301 if include_backrefs or not detect_is_backref(state, initiator):
302 obj = state.obj()
303 values[:] = [validator(obj, key, value) for value in values]
304
305 def set_(state, value, oldvalue, initiator):
306 if include_backrefs or not detect_is_backref(state, initiator):
307 return validator(state.obj(), key, value)
308 else:
309 return value
310
311 event.listen(desc, "append", append, raw=True, retval=True)
312 event.listen(desc, "bulk_replace", bulk_set, raw=True)
313 event.listen(desc, "set", set_, raw=True, retval=True)
314 if include_removes:
315 event.listen(desc, "remove", remove, raw=True, retval=True)
316
317
318def polymorphic_union(
319 table_map, typecolname, aliasname="p_union", cast_nulls=True
320):
321 """Create a ``UNION`` statement used by a polymorphic mapper.
322
323 See :ref:`concrete_inheritance` for an example of how
324 this is used.
325
326 :param table_map: mapping of polymorphic identities to
327 :class:`_schema.Table` objects.
328 :param typecolname: string name of a "discriminator" column, which will be
329 derived from the query, producing the polymorphic identity for
330 each row. If ``None``, no polymorphic discriminator is generated.
331 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
332 construct generated.
333 :param cast_nulls: if True, non-existent columns, which are represented
334 as labeled NULLs, will be passed into CAST. This is a legacy behavior
335 that is problematic on some backends such as Oracle - in which case it
336 can be set to False.
337
338 """
339
340 colnames: util.OrderedSet[str] = util.OrderedSet()
341 colnamemaps = {}
342 types = {}
343 for key in table_map:
344 table = table_map[key]
345
346 table = coercions.expect(
347 roles.StrictFromClauseRole, table, allow_select=True
348 )
349 table_map[key] = table
350
351 m = {}
352 for c in table.c:
353 if c.key == typecolname:
354 raise sa_exc.InvalidRequestError(
355 "Polymorphic union can't use '%s' as the discriminator "
356 "column due to mapped column %r; please apply the "
357 "'typecolname' "
358 "argument; this is available on "
359 "ConcreteBase as '_concrete_discriminator_name'"
360 % (typecolname, c)
361 )
362 colnames.add(c.key)
363 m[c.key] = c
364 types[c.key] = c.type
365 colnamemaps[table] = m
366
367 def col(name, table):
368 try:
369 return colnamemaps[table][name]
370 except KeyError:
371 if cast_nulls:
372 return sql.cast(sql.null(), types[name]).label(name)
373 else:
374 return sql.type_coerce(sql.null(), types[name]).label(name)
375
376 result = []
377 for type_, table in table_map.items():
378 if typecolname is not None:
379 result.append(
380 sql.select(
381 *(
382 [col(name, table) for name in colnames]
383 + [
384 sql.literal_column(
385 sql_util._quote_ddl_expr(type_)
386 ).label(typecolname)
387 ]
388 )
389 ).select_from(table)
390 )
391 else:
392 result.append(
393 sql.select(
394 *[col(name, table) for name in colnames]
395 ).select_from(table)
396 )
397 return sql.union_all(*result).alias(aliasname)
398
399
400def identity_key(
401 class_: Optional[Type[_T]] = None,
402 ident: Union[Any, Tuple[Any, ...]] = None,
403 *,
404 instance: Optional[_T] = None,
405 row: Optional[Union[Row[Any], RowMapping]] = None,
406 identity_token: Optional[Any] = None,
407) -> _IdentityKeyType[_T]:
408 r"""Generate "identity key" tuples, as are used as keys in the
409 :attr:`.Session.identity_map` dictionary.
410
411 This function has several call styles:
412
413 * ``identity_key(class, ident, identity_token=token)``
414
415 This form receives a mapped class and a primary key scalar or
416 tuple as an argument.
417
418 E.g.::
419
420 >>> identity_key(MyClass, (1, 2))
421 (<class '__main__.MyClass'>, (1, 2), None)
422
423 :param class: mapped class (must be a positional argument)
424 :param ident: primary key, may be a scalar or tuple argument.
425 :param identity_token: optional identity token
426
427 .. versionadded:: 1.2 added identity_token
428
429
430 * ``identity_key(instance=instance)``
431
432 This form will produce the identity key for a given instance. The
433 instance need not be persistent, only that its primary key attributes
434 are populated (else the key will contain ``None`` for those missing
435 values).
436
437 E.g.::
438
439 >>> instance = MyClass(1, 2)
440 >>> identity_key(instance=instance)
441 (<class '__main__.MyClass'>, (1, 2), None)
442
443 In this form, the given instance is ultimately run though
444 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
445 effect of performing a database check for the corresponding row
446 if the object is expired.
447
448 :param instance: object instance (must be given as a keyword arg)
449
450 * ``identity_key(class, row=row, identity_token=token)``
451
452 This form is similar to the class/tuple form, except is passed a
453 database result row as a :class:`.Row` or :class:`.RowMapping` object.
454
455 E.g.::
456
457 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
458 >>> identity_key(MyClass, row=row)
459 (<class '__main__.MyClass'>, (1, 2), None)
460
461 :param class: mapped class (must be a positional argument)
462 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
463 (must be given as a keyword arg)
464 :param identity_token: optional identity token
465
466 .. versionadded:: 1.2 added identity_token
467
468 """ # noqa: E501
469 if class_ is not None:
470 mapper = class_mapper(class_)
471 if row is None:
472 if ident is None:
473 raise sa_exc.ArgumentError("ident or row is required")
474 return mapper.identity_key_from_primary_key(
475 tuple(util.to_list(ident)), identity_token=identity_token
476 )
477 else:
478 return mapper.identity_key_from_row(
479 row, identity_token=identity_token
480 )
481 elif instance is not None:
482 mapper = object_mapper(instance)
483 return mapper.identity_key_from_instance(instance)
484 else:
485 raise sa_exc.ArgumentError("class or instance is required")
486
487
488class _TraceAdaptRole(enum.Enum):
489 """Enumeration of all the use cases for ORMAdapter.
490
491 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
492 used for in-place adaption of column expressions to be applied to a SELECT,
493 replacing :class:`.Table` and other objects that are mapped to classes with
494 aliases of those tables in the case of joined eager loading, or in the case
495 of polymorphic loading as used with concrete mappings or other custom "with
496 polymorphic" parameters, with whole user-defined subqueries. The
497 enumerations provide an overview of all the use cases used by ORMAdapter, a
498 layer of formality as to the introduction of new ORMAdapter use cases (of
499 which none are anticipated), as well as a means to trace the origins of a
500 particular ORMAdapter within runtime debugging.
501
502 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
503 open-ended statement adaption, including the ``Query.with_polymorphic()``
504 method and the ``Query.select_from_entity()`` methods, favoring
505 user-explicit aliasing schemes using the ``aliased()`` and
506 ``with_polymorphic()`` standalone constructs; these still use adaption,
507 however the adaption is applied in a narrower scope.
508
509 """
510
511 # aliased() use that is used to adapt individual attributes at query
512 # construction time
513 ALIASED_INSP = enum.auto()
514
515 # joinedload cases; typically adapt an ON clause of a relationship
516 # join
517 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
518 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
519 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
520
521 # polymorphic cases - these are complex ones that replace FROM
522 # clauses, replacing tables with subqueries
523 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
524 WITH_POLYMORPHIC_ADAPTER = enum.auto()
525 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
526 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
527
528 # the from_statement() case, used only to adapt individual attributes
529 # from a given statement to local ORM attributes at result fetching
530 # time. assigned to ORMCompileState._from_obj_alias
531 ADAPT_FROM_STATEMENT = enum.auto()
532
533 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
534 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
535 # joinedloads are then placed on the outside.
536 # assigned to ORMCompileState.compound_eager_adapter
537 COMPOUND_EAGER_STATEMENT = enum.auto()
538
539 # the legacy Query._set_select_from() case.
540 # this is needed for Query's set operations (i.e. UNION, etc. )
541 # as well as "legacy from_self()", which while removed from 2.0 as
542 # public API, is used for the Query.count() method. this one
543 # still does full statement traversal
544 # assigned to ORMCompileState._from_obj_alias
545 LEGACY_SELECT_FROM_ALIAS = enum.auto()
546
547
548class ORMStatementAdapter(sql_util.ColumnAdapter):
549 """ColumnAdapter which includes a role attribute."""
550
551 __slots__ = ("role",)
552
553 def __init__(
554 self,
555 role: _TraceAdaptRole,
556 selectable: Selectable,
557 *,
558 equivalents: Optional[_EquivalentColumnMap] = None,
559 adapt_required: bool = False,
560 allow_label_resolve: bool = True,
561 anonymize_labels: bool = False,
562 adapt_on_names: bool = False,
563 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
564 ):
565 self.role = role
566 super().__init__(
567 selectable,
568 equivalents=equivalents,
569 adapt_required=adapt_required,
570 allow_label_resolve=allow_label_resolve,
571 anonymize_labels=anonymize_labels,
572 adapt_on_names=adapt_on_names,
573 adapt_from_selectables=adapt_from_selectables,
574 )
575
576
577class ORMAdapter(sql_util.ColumnAdapter):
578 """ColumnAdapter subclass which excludes adaptation of entities from
579 non-matching mappers.
580
581 """
582
583 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
584
585 is_aliased_class: bool
586 aliased_insp: Optional[AliasedInsp[Any]]
587
588 def __init__(
589 self,
590 role: _TraceAdaptRole,
591 entity: _InternalEntityType[Any],
592 *,
593 equivalents: Optional[_EquivalentColumnMap] = None,
594 adapt_required: bool = False,
595 allow_label_resolve: bool = True,
596 anonymize_labels: bool = False,
597 selectable: Optional[Selectable] = None,
598 limit_on_entity: bool = True,
599 adapt_on_names: bool = False,
600 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
601 ):
602 self.role = role
603 self.mapper = entity.mapper
604 if selectable is None:
605 selectable = entity.selectable
606 if insp_is_aliased_class(entity):
607 self.is_aliased_class = True
608 self.aliased_insp = entity
609 else:
610 self.is_aliased_class = False
611 self.aliased_insp = None
612
613 super().__init__(
614 selectable,
615 equivalents,
616 adapt_required=adapt_required,
617 allow_label_resolve=allow_label_resolve,
618 anonymize_labels=anonymize_labels,
619 include_fn=self._include_fn if limit_on_entity else None,
620 adapt_on_names=adapt_on_names,
621 adapt_from_selectables=adapt_from_selectables,
622 )
623
624 def _include_fn(self, elem):
625 entity = elem._annotations.get("parentmapper", None)
626
627 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
628
629
630class AliasedClass(
631 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
632):
633 r"""Represents an "aliased" form of a mapped class for usage with Query.
634
635 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
636 construct, this object mimics the mapped class using a
637 ``__getattr__`` scheme and maintains a reference to a
638 real :class:`~sqlalchemy.sql.expression.Alias` object.
639
640 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
641 within a SQL statement generated by the ORM, such that an existing
642 mapped entity can be used in multiple contexts. A simple example::
643
644 # find all pairs of users with the same name
645 user_alias = aliased(User)
646 session.query(User, user_alias).join(
647 (user_alias, User.id > user_alias.id)
648 ).filter(User.name == user_alias.name)
649
650 :class:`.AliasedClass` is also capable of mapping an existing mapped
651 class to an entirely new selectable, provided this selectable is column-
652 compatible with the existing mapped selectable, and it can also be
653 configured in a mapping as the target of a :func:`_orm.relationship`.
654 See the links below for examples.
655
656 The :class:`.AliasedClass` object is constructed typically using the
657 :func:`_orm.aliased` function. It also is produced with additional
658 configuration when using the :func:`_orm.with_polymorphic` function.
659
660 The resulting object is an instance of :class:`.AliasedClass`.
661 This object implements an attribute scheme which produces the
662 same attribute and method interface as the original mapped
663 class, allowing :class:`.AliasedClass` to be compatible
664 with any attribute technique which works on the original class,
665 including hybrid attributes (see :ref:`hybrids_toplevel`).
666
667 The :class:`.AliasedClass` can be inspected for its underlying
668 :class:`_orm.Mapper`, aliased selectable, and other information
669 using :func:`_sa.inspect`::
670
671 from sqlalchemy import inspect
672
673 my_alias = aliased(MyClass)
674 insp = inspect(my_alias)
675
676 The resulting inspection object is an instance of :class:`.AliasedInsp`.
677
678
679 .. seealso::
680
681 :func:`.aliased`
682
683 :func:`.with_polymorphic`
684
685 :ref:`relationship_aliased_class`
686
687 :ref:`relationship_to_window_function`
688
689
690 """
691
692 __name__: str
693
694 def __init__(
695 self,
696 mapped_class_or_ac: _EntityType[_O],
697 alias: Optional[FromClause] = None,
698 name: Optional[str] = None,
699 flat: bool = False,
700 adapt_on_names: bool = False,
701 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
702 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
703 base_alias: Optional[AliasedInsp[Any]] = None,
704 use_mapper_path: bool = False,
705 represents_outer_join: bool = False,
706 ):
707 insp = cast(
708 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
709 )
710 mapper = insp.mapper
711
712 nest_adapters = False
713
714 if alias is None:
715 if insp.is_aliased_class and insp.selectable._is_subquery:
716 alias = insp.selectable.alias()
717 else:
718 alias = (
719 mapper._with_polymorphic_selectable._anonymous_fromclause(
720 name=name,
721 flat=flat,
722 )
723 )
724 elif insp.is_aliased_class:
725 nest_adapters = True
726
727 assert alias is not None
728 self._aliased_insp = AliasedInsp(
729 self,
730 insp,
731 alias,
732 name,
733 (
734 with_polymorphic_mappers
735 if with_polymorphic_mappers
736 else mapper.with_polymorphic_mappers
737 ),
738 (
739 with_polymorphic_discriminator
740 if with_polymorphic_discriminator is not None
741 else mapper.polymorphic_on
742 ),
743 base_alias,
744 use_mapper_path,
745 adapt_on_names,
746 represents_outer_join,
747 nest_adapters,
748 )
749
750 self.__name__ = f"aliased({mapper.class_.__name__})"
751
752 @classmethod
753 def _reconstitute_from_aliased_insp(
754 cls, aliased_insp: AliasedInsp[_O]
755 ) -> AliasedClass[_O]:
756 obj = cls.__new__(cls)
757 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
758 obj._aliased_insp = aliased_insp
759
760 if aliased_insp._is_with_polymorphic:
761 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
762 if sub_aliased_insp is not aliased_insp:
763 ent = AliasedClass._reconstitute_from_aliased_insp(
764 sub_aliased_insp
765 )
766 setattr(obj, sub_aliased_insp.class_.__name__, ent)
767
768 return obj
769
770 def __getattr__(self, key: str) -> Any:
771 try:
772 _aliased_insp = self.__dict__["_aliased_insp"]
773 except KeyError:
774 raise AttributeError()
775 else:
776 target = _aliased_insp._target
777 # maintain all getattr mechanics
778 attr = getattr(target, key)
779
780 # attribute is a method, that will be invoked against a
781 # "self"; so just return a new method with the same function and
782 # new self
783 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
784 return types.MethodType(attr.__func__, self)
785
786 # attribute is a descriptor, that will be invoked against a
787 # "self"; so invoke the descriptor against this self
788 if hasattr(attr, "__get__"):
789 attr = attr.__get__(None, self)
790
791 # attributes within the QueryableAttribute system will want this
792 # to be invoked so the object can be adapted
793 if hasattr(attr, "adapt_to_entity"):
794 attr = attr.adapt_to_entity(_aliased_insp)
795 setattr(self, key, attr)
796
797 return attr
798
799 def _get_from_serialized(
800 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
801 ) -> Any:
802 # this method is only used in terms of the
803 # sqlalchemy.ext.serializer extension
804 attr = getattr(mapped_class, key)
805 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
806 return types.MethodType(attr.__func__, self)
807
808 # attribute is a descriptor, that will be invoked against a
809 # "self"; so invoke the descriptor against this self
810 if hasattr(attr, "__get__"):
811 attr = attr.__get__(None, self)
812
813 # attributes within the QueryableAttribute system will want this
814 # to be invoked so the object can be adapted
815 if hasattr(attr, "adapt_to_entity"):
816 aliased_insp._weak_entity = weakref.ref(self)
817 attr = attr.adapt_to_entity(aliased_insp)
818 setattr(self, key, attr)
819
820 return attr
821
822 def __repr__(self) -> str:
823 return "<AliasedClass at 0x%x; %s>" % (
824 id(self),
825 self._aliased_insp._target.__name__,
826 )
827
828 def __str__(self) -> str:
829 return str(self._aliased_insp)
830
831
832@inspection._self_inspects
833class AliasedInsp(
834 ORMEntityColumnsClauseRole[_O],
835 ORMFromClauseRole,
836 HasCacheKey,
837 InspectionAttr,
838 MemoizedSlots,
839 inspection.Inspectable["AliasedInsp[_O]"],
840 Generic[_O],
841):
842 """Provide an inspection interface for an
843 :class:`.AliasedClass` object.
844
845 The :class:`.AliasedInsp` object is returned
846 given an :class:`.AliasedClass` using the
847 :func:`_sa.inspect` function::
848
849 from sqlalchemy import inspect
850 from sqlalchemy.orm import aliased
851
852 my_alias = aliased(MyMappedClass)
853 insp = inspect(my_alias)
854
855 Attributes on :class:`.AliasedInsp`
856 include:
857
858 * ``entity`` - the :class:`.AliasedClass` represented.
859 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
860 * ``selectable`` - the :class:`_expression.Alias`
861 construct which ultimately
862 represents an aliased :class:`_schema.Table` or
863 :class:`_expression.Select`
864 construct.
865 * ``name`` - the name of the alias. Also is used as the attribute
866 name when returned in a result tuple from :class:`_query.Query`.
867 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
868 objects
869 indicating all those mappers expressed in the select construct
870 for the :class:`.AliasedClass`.
871 * ``polymorphic_on`` - an alternate column or SQL expression which
872 will be used as the "discriminator" for a polymorphic load.
873
874 .. seealso::
875
876 :ref:`inspection_toplevel`
877
878 """
879
880 __slots__ = (
881 "__weakref__",
882 "_weak_entity",
883 "mapper",
884 "selectable",
885 "name",
886 "_adapt_on_names",
887 "with_polymorphic_mappers",
888 "polymorphic_on",
889 "_use_mapper_path",
890 "_base_alias",
891 "represents_outer_join",
892 "persist_selectable",
893 "local_table",
894 "_is_with_polymorphic",
895 "_with_polymorphic_entities",
896 "_adapter",
897 "_target",
898 "__clause_element__",
899 "_memoized_values",
900 "_all_column_expressions",
901 "_nest_adapters",
902 )
903
904 _cache_key_traversal = [
905 ("name", visitors.ExtendedInternalTraversal.dp_string),
906 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
907 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
908 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
909 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
910 (
911 "with_polymorphic_mappers",
912 visitors.InternalTraversal.dp_has_cache_key_list,
913 ),
914 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
915 ]
916
917 mapper: Mapper[_O]
918 selectable: FromClause
919 _adapter: ORMAdapter
920 with_polymorphic_mappers: Sequence[Mapper[Any]]
921 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
922
923 _weak_entity: weakref.ref[AliasedClass[_O]]
924 """the AliasedClass that refers to this AliasedInsp"""
925
926 _target: Union[Type[_O], AliasedClass[_O]]
927 """the thing referenced by the AliasedClass/AliasedInsp.
928
929 In the vast majority of cases, this is the mapped class. However
930 it may also be another AliasedClass (alias of alias).
931
932 """
933
934 def __init__(
935 self,
936 entity: AliasedClass[_O],
937 inspected: _InternalEntityType[_O],
938 selectable: FromClause,
939 name: Optional[str],
940 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
941 polymorphic_on: Optional[ColumnElement[Any]],
942 _base_alias: Optional[AliasedInsp[Any]],
943 _use_mapper_path: bool,
944 adapt_on_names: bool,
945 represents_outer_join: bool,
946 nest_adapters: bool,
947 ):
948 mapped_class_or_ac = inspected.entity
949 mapper = inspected.mapper
950
951 self._weak_entity = weakref.ref(entity)
952 self.mapper = mapper
953 self.selectable = self.persist_selectable = self.local_table = (
954 selectable
955 )
956 self.name = name
957 self.polymorphic_on = polymorphic_on
958 self._base_alias = weakref.ref(_base_alias or self)
959 self._use_mapper_path = _use_mapper_path
960 self.represents_outer_join = represents_outer_join
961 self._nest_adapters = nest_adapters
962
963 if with_polymorphic_mappers:
964 self._is_with_polymorphic = True
965 self.with_polymorphic_mappers = with_polymorphic_mappers
966 self._with_polymorphic_entities = []
967 for poly in self.with_polymorphic_mappers:
968 if poly is not mapper:
969 ent = AliasedClass(
970 poly.class_,
971 selectable,
972 base_alias=self,
973 adapt_on_names=adapt_on_names,
974 use_mapper_path=_use_mapper_path,
975 )
976
977 setattr(self.entity, poly.class_.__name__, ent)
978 self._with_polymorphic_entities.append(ent._aliased_insp)
979
980 else:
981 self._is_with_polymorphic = False
982 self.with_polymorphic_mappers = [mapper]
983
984 self._adapter = ORMAdapter(
985 _TraceAdaptRole.ALIASED_INSP,
986 mapper,
987 selectable=selectable,
988 equivalents=mapper._equivalent_columns,
989 adapt_on_names=adapt_on_names,
990 anonymize_labels=True,
991 # make sure the adapter doesn't try to grab other tables that
992 # are not even the thing we are mapping, such as embedded
993 # selectables in subqueries or CTEs. See issue #6060
994 adapt_from_selectables={
995 m.selectable
996 for m in self.with_polymorphic_mappers
997 if not adapt_on_names
998 },
999 limit_on_entity=False,
1000 )
1001
1002 if nest_adapters:
1003 # supports "aliased class of aliased class" use case
1004 assert isinstance(inspected, AliasedInsp)
1005 self._adapter = inspected._adapter.wrap(self._adapter)
1006
1007 self._adapt_on_names = adapt_on_names
1008 self._target = mapped_class_or_ac
1009
1010 @classmethod
1011 def _alias_factory(
1012 cls,
1013 element: Union[_EntityType[_O], FromClause],
1014 alias: Optional[FromClause] = None,
1015 name: Optional[str] = None,
1016 flat: bool = False,
1017 adapt_on_names: bool = False,
1018 ) -> Union[AliasedClass[_O], FromClause]:
1019 if isinstance(element, FromClause):
1020 if adapt_on_names:
1021 raise sa_exc.ArgumentError(
1022 "adapt_on_names only applies to ORM elements"
1023 )
1024 if name:
1025 return element.alias(name=name, flat=flat)
1026 else:
1027 return coercions.expect(
1028 roles.AnonymizedFromClauseRole, element, flat=flat
1029 )
1030 else:
1031 return AliasedClass(
1032 element,
1033 alias=alias,
1034 flat=flat,
1035 name=name,
1036 adapt_on_names=adapt_on_names,
1037 )
1038
1039 @classmethod
1040 def _with_polymorphic_factory(
1041 cls,
1042 base: Union[Type[_O], Mapper[_O]],
1043 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1044 selectable: Union[Literal[False, None], FromClause] = False,
1045 flat: bool = False,
1046 polymorphic_on: Optional[ColumnElement[Any]] = None,
1047 aliased: bool = False,
1048 innerjoin: bool = False,
1049 adapt_on_names: bool = False,
1050 name: Optional[str] = None,
1051 _use_mapper_path: bool = False,
1052 ) -> AliasedClass[_O]:
1053 primary_mapper = _class_to_mapper(base)
1054
1055 if selectable not in (None, False) and flat:
1056 raise sa_exc.ArgumentError(
1057 "the 'flat' and 'selectable' arguments cannot be passed "
1058 "simultaneously to with_polymorphic()"
1059 )
1060
1061 mappers, selectable = primary_mapper._with_polymorphic_args(
1062 classes, selectable, innerjoin=innerjoin
1063 )
1064 if aliased or flat:
1065 assert selectable is not None
1066 selectable = selectable._anonymous_fromclause(flat=flat)
1067
1068 return AliasedClass(
1069 base,
1070 selectable,
1071 name=name,
1072 with_polymorphic_mappers=mappers,
1073 adapt_on_names=adapt_on_names,
1074 with_polymorphic_discriminator=polymorphic_on,
1075 use_mapper_path=_use_mapper_path,
1076 represents_outer_join=not innerjoin,
1077 )
1078
1079 @property
1080 def entity(self) -> AliasedClass[_O]:
1081 # to eliminate reference cycles, the AliasedClass is held weakly.
1082 # this produces some situations where the AliasedClass gets lost,
1083 # particularly when one is created internally and only the AliasedInsp
1084 # is passed around.
1085 # to work around this case, we just generate a new one when we need
1086 # it, as it is a simple class with very little initial state on it.
1087 ent = self._weak_entity()
1088 if ent is None:
1089 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1090 self._weak_entity = weakref.ref(ent)
1091 return ent
1092
1093 is_aliased_class = True
1094 "always returns True"
1095
1096 def _memoized_method___clause_element__(self) -> FromClause:
1097 return self.selectable._annotate(
1098 {
1099 "parentmapper": self.mapper,
1100 "parententity": self,
1101 "entity_namespace": self,
1102 }
1103 )._set_propagate_attrs(
1104 {"compile_state_plugin": "orm", "plugin_subject": self}
1105 )
1106
1107 @property
1108 def entity_namespace(self) -> AliasedClass[_O]:
1109 return self.entity
1110
1111 @property
1112 def class_(self) -> Type[_O]:
1113 """Return the mapped class ultimately represented by this
1114 :class:`.AliasedInsp`."""
1115 return self.mapper.class_
1116
1117 @property
1118 def _path_registry(self) -> AbstractEntityRegistry:
1119 if self._use_mapper_path:
1120 return self.mapper._path_registry
1121 else:
1122 return PathRegistry.per_mapper(self)
1123
1124 def __getstate__(self) -> Dict[str, Any]:
1125 return {
1126 "entity": self.entity,
1127 "mapper": self.mapper,
1128 "alias": self.selectable,
1129 "name": self.name,
1130 "adapt_on_names": self._adapt_on_names,
1131 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1132 "with_polymorphic_discriminator": self.polymorphic_on,
1133 "base_alias": self._base_alias(),
1134 "use_mapper_path": self._use_mapper_path,
1135 "represents_outer_join": self.represents_outer_join,
1136 "nest_adapters": self._nest_adapters,
1137 }
1138
1139 def __setstate__(self, state: Dict[str, Any]) -> None:
1140 self.__init__( # type: ignore
1141 state["entity"],
1142 state["mapper"],
1143 state["alias"],
1144 state["name"],
1145 state["with_polymorphic_mappers"],
1146 state["with_polymorphic_discriminator"],
1147 state["base_alias"],
1148 state["use_mapper_path"],
1149 state["adapt_on_names"],
1150 state["represents_outer_join"],
1151 state["nest_adapters"],
1152 )
1153
1154 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1155 # assert self._is_with_polymorphic
1156 # assert other._is_with_polymorphic
1157
1158 primary_mapper = other.mapper
1159
1160 assert self.mapper is primary_mapper
1161
1162 our_classes = util.to_set(
1163 mp.class_ for mp in self.with_polymorphic_mappers
1164 )
1165 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1166 if our_classes == new_classes:
1167 return other
1168 else:
1169 classes = our_classes.union(new_classes)
1170
1171 mappers, selectable = primary_mapper._with_polymorphic_args(
1172 classes, None, innerjoin=not other.represents_outer_join
1173 )
1174 selectable = selectable._anonymous_fromclause(flat=True)
1175 return AliasedClass(
1176 primary_mapper,
1177 selectable,
1178 with_polymorphic_mappers=mappers,
1179 with_polymorphic_discriminator=other.polymorphic_on,
1180 use_mapper_path=other._use_mapper_path,
1181 represents_outer_join=other.represents_outer_join,
1182 )._aliased_insp
1183
1184 def _adapt_element(
1185 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1186 ) -> _ORMCOLEXPR:
1187 assert isinstance(expr, ColumnElement)
1188 d: Dict[str, Any] = {
1189 "parententity": self,
1190 "parentmapper": self.mapper,
1191 }
1192 if key:
1193 d["proxy_key"] = key
1194
1195 # IMO mypy should see this one also as returning the same type
1196 # we put into it, but it's not
1197 return (
1198 self._adapter.traverse(expr)
1199 ._annotate(d)
1200 ._set_propagate_attrs(
1201 {"compile_state_plugin": "orm", "plugin_subject": self}
1202 )
1203 )
1204
1205 if TYPE_CHECKING:
1206 # establish compatibility with the _ORMAdapterProto protocol,
1207 # which in turn is compatible with _CoreAdapterProto.
1208
1209 def _orm_adapt_element(
1210 self,
1211 obj: _CE,
1212 key: Optional[str] = None,
1213 ) -> _CE: ...
1214
1215 else:
1216 _orm_adapt_element = _adapt_element
1217
1218 def _entity_for_mapper(self, mapper):
1219 self_poly = self.with_polymorphic_mappers
1220 if mapper in self_poly:
1221 if mapper is self.mapper:
1222 return self
1223 else:
1224 return getattr(
1225 self.entity, mapper.class_.__name__
1226 )._aliased_insp
1227 elif mapper.isa(self.mapper):
1228 return self
1229 else:
1230 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1231
1232 def _memoized_attr__get_clause(self):
1233 onclause, replacemap = self.mapper._get_clause
1234 return (
1235 self._adapter.traverse(onclause),
1236 {
1237 self._adapter.traverse(col): param
1238 for col, param in replacemap.items()
1239 },
1240 )
1241
1242 def _memoized_attr__memoized_values(self):
1243 return {}
1244
1245 def _memoized_attr__all_column_expressions(self):
1246 if self._is_with_polymorphic:
1247 cols_plus_keys = self.mapper._columns_plus_keys(
1248 [ent.mapper for ent in self._with_polymorphic_entities]
1249 )
1250 else:
1251 cols_plus_keys = self.mapper._columns_plus_keys()
1252
1253 cols_plus_keys = [
1254 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1255 ]
1256
1257 return ColumnCollection(cols_plus_keys)
1258
1259 def _memo(self, key, callable_, *args, **kw):
1260 if key in self._memoized_values:
1261 return self._memoized_values[key]
1262 else:
1263 self._memoized_values[key] = value = callable_(*args, **kw)
1264 return value
1265
1266 def __repr__(self):
1267 if self.with_polymorphic_mappers:
1268 with_poly = "(%s)" % ", ".join(
1269 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1270 )
1271 else:
1272 with_poly = ""
1273 return "<AliasedInsp at 0x%x; %s%s>" % (
1274 id(self),
1275 self.class_.__name__,
1276 with_poly,
1277 )
1278
1279 def __str__(self):
1280 if self._is_with_polymorphic:
1281 return "with_polymorphic(%s, [%s])" % (
1282 self._target.__name__,
1283 ", ".join(
1284 mp.class_.__name__
1285 for mp in self.with_polymorphic_mappers
1286 if mp is not self.mapper
1287 ),
1288 )
1289 else:
1290 return "aliased(%s)" % (self._target.__name__,)
1291
1292
1293class _WrapUserEntity:
1294 """A wrapper used within the loader_criteria lambda caller so that
1295 we can bypass declared_attr descriptors on unmapped mixins, which
1296 normally emit a warning for such use.
1297
1298 might also be useful for other per-lambda instrumentations should
1299 the need arise.
1300
1301 """
1302
1303 __slots__ = ("subject",)
1304
1305 def __init__(self, subject):
1306 self.subject = subject
1307
1308 @util.preload_module("sqlalchemy.orm.decl_api")
1309 def __getattribute__(self, name):
1310 decl_api = util.preloaded.orm.decl_api
1311
1312 subject = object.__getattribute__(self, "subject")
1313 if name in subject.__dict__ and isinstance(
1314 subject.__dict__[name], decl_api.declared_attr
1315 ):
1316 return subject.__dict__[name].fget(subject)
1317 else:
1318 return getattr(subject, name)
1319
1320
1321class LoaderCriteriaOption(CriteriaOption):
1322 """Add additional WHERE criteria to the load for all occurrences of
1323 a particular entity.
1324
1325 :class:`_orm.LoaderCriteriaOption` is invoked using the
1326 :func:`_orm.with_loader_criteria` function; see that function for
1327 details.
1328
1329 .. versionadded:: 1.4
1330
1331 """
1332
1333 __slots__ = (
1334 "root_entity",
1335 "entity",
1336 "deferred_where_criteria",
1337 "where_criteria",
1338 "_where_crit_orig",
1339 "include_aliases",
1340 "propagate_to_loaders",
1341 )
1342
1343 _traverse_internals = [
1344 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1345 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1346 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1347 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1348 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1349 ]
1350
1351 root_entity: Optional[Type[Any]]
1352 entity: Optional[_InternalEntityType[Any]]
1353 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1354 deferred_where_criteria: bool
1355 include_aliases: bool
1356 propagate_to_loaders: bool
1357
1358 _where_crit_orig: Any
1359
1360 def __init__(
1361 self,
1362 entity_or_base: _EntityType[Any],
1363 where_criteria: Union[
1364 _ColumnExpressionArgument[bool],
1365 Callable[[Any], _ColumnExpressionArgument[bool]],
1366 ],
1367 loader_only: bool = False,
1368 include_aliases: bool = False,
1369 propagate_to_loaders: bool = True,
1370 track_closure_variables: bool = True,
1371 ):
1372 entity = cast(
1373 "_InternalEntityType[Any]",
1374 inspection.inspect(entity_or_base, False),
1375 )
1376 if entity is None:
1377 self.root_entity = cast("Type[Any]", entity_or_base)
1378 self.entity = None
1379 else:
1380 self.root_entity = None
1381 self.entity = entity
1382
1383 self._where_crit_orig = where_criteria
1384 if callable(where_criteria):
1385 if self.root_entity is not None:
1386 wrap_entity = self.root_entity
1387 else:
1388 assert entity is not None
1389 wrap_entity = entity.entity
1390
1391 self.deferred_where_criteria = True
1392 self.where_criteria = lambdas.DeferredLambdaElement(
1393 where_criteria,
1394 roles.WhereHavingRole,
1395 lambda_args=(_WrapUserEntity(wrap_entity),),
1396 opts=lambdas.LambdaOptions(
1397 track_closure_variables=track_closure_variables
1398 ),
1399 )
1400 else:
1401 self.deferred_where_criteria = False
1402 self.where_criteria = coercions.expect(
1403 roles.WhereHavingRole, where_criteria
1404 )
1405
1406 self.include_aliases = include_aliases
1407 self.propagate_to_loaders = propagate_to_loaders
1408
1409 @classmethod
1410 def _unreduce(
1411 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1412 ):
1413 return LoaderCriteriaOption(
1414 entity,
1415 where_criteria,
1416 include_aliases=include_aliases,
1417 propagate_to_loaders=propagate_to_loaders,
1418 )
1419
1420 def __reduce__(self):
1421 return (
1422 LoaderCriteriaOption._unreduce,
1423 (
1424 self.entity.class_ if self.entity else self.root_entity,
1425 self._where_crit_orig,
1426 self.include_aliases,
1427 self.propagate_to_loaders,
1428 ),
1429 )
1430
1431 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1432 if self.entity:
1433 yield from self.entity.mapper.self_and_descendants
1434 else:
1435 assert self.root_entity
1436 stack = list(self.root_entity.__subclasses__())
1437 while stack:
1438 subclass = stack.pop(0)
1439 ent = cast(
1440 "_InternalEntityType[Any]",
1441 inspection.inspect(subclass, raiseerr=False),
1442 )
1443 if ent:
1444 yield from ent.mapper.self_and_descendants
1445 else:
1446 stack.extend(subclass.__subclasses__())
1447
1448 def _should_include(self, compile_state: ORMCompileState) -> bool:
1449 if (
1450 compile_state.select_statement._annotations.get(
1451 "for_loader_criteria", None
1452 )
1453 is self
1454 ):
1455 return False
1456 return True
1457
1458 def _resolve_where_criteria(
1459 self, ext_info: _InternalEntityType[Any]
1460 ) -> ColumnElement[bool]:
1461 if self.deferred_where_criteria:
1462 crit = cast(
1463 "ColumnElement[bool]",
1464 self.where_criteria._resolve_with_args(ext_info.entity),
1465 )
1466 else:
1467 crit = self.where_criteria # type: ignore
1468 assert isinstance(crit, ColumnElement)
1469 return sql_util._deep_annotate(
1470 crit,
1471 {"for_loader_criteria": self},
1472 detect_subquery_cols=True,
1473 ind_cols_on_fromclause=True,
1474 )
1475
1476 def process_compile_state_replaced_entities(
1477 self,
1478 compile_state: ORMCompileState,
1479 mapper_entities: Iterable[_MapperEntity],
1480 ) -> None:
1481 self.process_compile_state(compile_state)
1482
1483 def process_compile_state(self, compile_state: ORMCompileState) -> None:
1484 """Apply a modification to a given :class:`.CompileState`."""
1485
1486 # if options to limit the criteria to immediate query only,
1487 # use compile_state.attributes instead
1488
1489 self.get_global_criteria(compile_state.global_attributes)
1490
1491 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1492 for mp in self._all_mappers():
1493 load_criteria = attributes.setdefault(
1494 ("additional_entity_criteria", mp), []
1495 )
1496
1497 load_criteria.append(self)
1498
1499
1500inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1501
1502
1503@inspection._inspects(type)
1504def _inspect_mc(
1505 class_: Type[_O],
1506) -> Optional[Mapper[_O]]:
1507 try:
1508 class_manager = opt_manager_of_class(class_)
1509 if class_manager is None or not class_manager.is_mapped:
1510 return None
1511 mapper = class_manager.mapper
1512 except exc.NO_STATE:
1513 return None
1514 else:
1515 return mapper
1516
1517
1518GenericAlias = type(List[Any])
1519
1520
1521@inspection._inspects(GenericAlias)
1522def _inspect_generic_alias(
1523 class_: Type[_O],
1524) -> Optional[Mapper[_O]]:
1525 origin = cast("Type[_O]", get_origin(class_))
1526 return _inspect_mc(origin)
1527
1528
1529@inspection._self_inspects
1530class Bundle(
1531 ORMColumnsClauseRole[_T],
1532 SupportsCloneAnnotations,
1533 MemoizedHasCacheKey,
1534 inspection.Inspectable["Bundle[_T]"],
1535 InspectionAttr,
1536):
1537 """A grouping of SQL expressions that are returned by a :class:`.Query`
1538 under one namespace.
1539
1540 The :class:`.Bundle` essentially allows nesting of the tuple-based
1541 results returned by a column-oriented :class:`_query.Query` object.
1542 It also
1543 is extensible via simple subclassing, where the primary capability
1544 to override is that of how the set of expressions should be returned,
1545 allowing post-processing as well as custom return types, without
1546 involving ORM identity-mapped classes.
1547
1548 .. seealso::
1549
1550 :ref:`bundles`
1551
1552
1553 """
1554
1555 single_entity = False
1556 """If True, queries for a single Bundle will be returned as a single
1557 entity, rather than an element within a keyed tuple."""
1558
1559 is_clause_element = False
1560
1561 is_mapper = False
1562
1563 is_aliased_class = False
1564
1565 is_bundle = True
1566
1567 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1568
1569 proxy_set = util.EMPTY_SET # type: ignore
1570
1571 exprs: List[_ColumnsClauseElement]
1572
1573 def __init__(
1574 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1575 ):
1576 r"""Construct a new :class:`.Bundle`.
1577
1578 e.g.::
1579
1580 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1581
1582 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
1583 print(row.mybundle.x, row.mybundle.y)
1584
1585 :param name: name of the bundle.
1586 :param \*exprs: columns or SQL expressions comprising the bundle.
1587 :param single_entity=False: if True, rows for this :class:`.Bundle`
1588 can be returned as a "single entity" outside of any enclosing tuple
1589 in the same manner as a mapped entity.
1590
1591 """ # noqa: E501
1592 self.name = self._label = name
1593 coerced_exprs = [
1594 coercions.expect(
1595 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1596 )
1597 for expr in exprs
1598 ]
1599 self.exprs = coerced_exprs
1600
1601 self.c = self.columns = ColumnCollection(
1602 (getattr(col, "key", col._label), col)
1603 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1604 ).as_readonly()
1605 self.single_entity = kw.pop("single_entity", self.single_entity)
1606
1607 def _gen_cache_key(
1608 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1609 ) -> Tuple[Any, ...]:
1610 return (self.__class__, self.name, self.single_entity) + tuple(
1611 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1612 )
1613
1614 @property
1615 def mapper(self) -> Optional[Mapper[Any]]:
1616 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1617 "parentmapper", None
1618 )
1619 return mp
1620
1621 @property
1622 def entity(self) -> Optional[_InternalEntityType[Any]]:
1623 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1624 0
1625 ]._annotations.get("parententity", None)
1626 return ie
1627
1628 @property
1629 def entity_namespace(
1630 self,
1631 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1632 return self.c
1633
1634 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1635
1636 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1637
1638 e.g.::
1639
1640 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1641
1642 q = sess.query(bn).filter(bn.c.x == 5)
1643
1644 Nesting of bundles is also supported::
1645
1646 b1 = Bundle(
1647 "b1",
1648 Bundle("b2", MyClass.a, MyClass.b),
1649 Bundle("b3", MyClass.x, MyClass.y),
1650 )
1651
1652 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1653
1654 .. seealso::
1655
1656 :attr:`.Bundle.c`
1657
1658 """ # noqa: E501
1659
1660 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1661 """An alias for :attr:`.Bundle.columns`."""
1662
1663 def _clone(self, **kw):
1664 cloned = self.__class__.__new__(self.__class__)
1665 cloned.__dict__.update(self.__dict__)
1666 return cloned
1667
1668 def __clause_element__(self):
1669 # ensure existing entity_namespace remains
1670 annotations = {"bundle": self, "entity_namespace": self}
1671 annotations.update(self._annotations)
1672
1673 plugin_subject = self.exprs[0]._propagate_attrs.get(
1674 "plugin_subject", self.entity
1675 )
1676 return (
1677 expression.ClauseList(
1678 _literal_as_text_role=roles.ColumnsClauseRole,
1679 group=False,
1680 *[e._annotations.get("bundle", e) for e in self.exprs],
1681 )
1682 ._annotate(annotations)
1683 ._set_propagate_attrs(
1684 # the Bundle *must* use the orm plugin no matter what. the
1685 # subject can be None but it's much better if it's not.
1686 {
1687 "compile_state_plugin": "orm",
1688 "plugin_subject": plugin_subject,
1689 }
1690 )
1691 )
1692
1693 @property
1694 def clauses(self):
1695 return self.__clause_element__().clauses
1696
1697 def label(self, name):
1698 """Provide a copy of this :class:`.Bundle` passing a new label."""
1699
1700 cloned = self._clone()
1701 cloned.name = name
1702 return cloned
1703
1704 def create_row_processor(
1705 self,
1706 query: Select[Any],
1707 procs: Sequence[Callable[[Row[Any]], Any]],
1708 labels: Sequence[str],
1709 ) -> Callable[[Row[Any]], Any]:
1710 """Produce the "row processing" function for this :class:`.Bundle`.
1711
1712 May be overridden by subclasses to provide custom behaviors when
1713 results are fetched. The method is passed the statement object and a
1714 set of "row processor" functions at query execution time; these
1715 processor functions when given a result row will return the individual
1716 attribute value, which can then be adapted into any kind of return data
1717 structure.
1718
1719 The example below illustrates replacing the usual :class:`.Row`
1720 return structure with a straight Python dictionary::
1721
1722 from sqlalchemy.orm import Bundle
1723
1724
1725 class DictBundle(Bundle):
1726 def create_row_processor(self, query, procs, labels):
1727 "Override create_row_processor to return values as dictionaries"
1728
1729 def proc(row):
1730 return dict(zip(labels, (proc(row) for proc in procs)))
1731
1732 return proc
1733
1734 A result from the above :class:`_orm.Bundle` will return dictionary
1735 values::
1736
1737 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
1738 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
1739 print(row.mybundle["data1"], row.mybundle["data2"])
1740
1741 """ # noqa: E501
1742 keyed_tuple = result_tuple(labels, [() for l in labels])
1743
1744 def proc(row: Row[Any]) -> Any:
1745 return keyed_tuple([proc(row) for proc in procs])
1746
1747 return proc
1748
1749
1750def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1751 """Deep copy the given ClauseElement, annotating each element with the
1752 "_orm_adapt" flag.
1753
1754 Elements within the exclude collection will be cloned but not annotated.
1755
1756 """
1757 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1758
1759
1760def _orm_deannotate(element: _SA) -> _SA:
1761 """Remove annotations that link a column to a particular mapping.
1762
1763 Note this doesn't affect "remote" and "foreign" annotations
1764 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1765 annotators.
1766
1767 """
1768
1769 return sql_util._deep_deannotate(
1770 element, values=("_orm_adapt", "parententity")
1771 )
1772
1773
1774def _orm_full_deannotate(element: _SA) -> _SA:
1775 return sql_util._deep_deannotate(element)
1776
1777
1778class _ORMJoin(expression.Join):
1779 """Extend Join to support ORM constructs as input."""
1780
1781 __visit_name__ = expression.Join.__visit_name__
1782
1783 inherit_cache = True
1784
1785 def __init__(
1786 self,
1787 left: _FromClauseArgument,
1788 right: _FromClauseArgument,
1789 onclause: Optional[_OnClauseArgument] = None,
1790 isouter: bool = False,
1791 full: bool = False,
1792 _left_memo: Optional[Any] = None,
1793 _right_memo: Optional[Any] = None,
1794 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1795 ):
1796 left_info = cast(
1797 "Union[FromClause, _InternalEntityType[Any]]",
1798 inspection.inspect(left),
1799 )
1800
1801 right_info = cast(
1802 "Union[FromClause, _InternalEntityType[Any]]",
1803 inspection.inspect(right),
1804 )
1805 adapt_to = right_info.selectable
1806
1807 # used by joined eager loader
1808 self._left_memo = _left_memo
1809 self._right_memo = _right_memo
1810
1811 if isinstance(onclause, attributes.QueryableAttribute):
1812 if TYPE_CHECKING:
1813 assert isinstance(
1814 onclause.comparator, RelationshipProperty.Comparator
1815 )
1816 on_selectable = onclause.comparator._source_selectable()
1817 prop = onclause.property
1818 _extra_criteria += onclause._extra_criteria
1819 elif isinstance(onclause, MapperProperty):
1820 # used internally by joined eager loader...possibly not ideal
1821 prop = onclause
1822 on_selectable = prop.parent.selectable
1823 else:
1824 prop = None
1825 on_selectable = None
1826
1827 left_selectable = left_info.selectable
1828 if prop:
1829 adapt_from: Optional[FromClause]
1830 if sql_util.clause_is_present(on_selectable, left_selectable):
1831 adapt_from = on_selectable
1832 else:
1833 assert isinstance(left_selectable, FromClause)
1834 adapt_from = left_selectable
1835
1836 (
1837 pj,
1838 sj,
1839 source,
1840 dest,
1841 secondary,
1842 target_adapter,
1843 ) = prop._create_joins(
1844 source_selectable=adapt_from,
1845 dest_selectable=adapt_to,
1846 source_polymorphic=True,
1847 of_type_entity=right_info,
1848 alias_secondary=True,
1849 extra_criteria=_extra_criteria,
1850 )
1851
1852 if sj is not None:
1853 if isouter:
1854 # note this is an inner join from secondary->right
1855 right = sql.join(secondary, right, sj)
1856 onclause = pj
1857 else:
1858 left = sql.join(left, secondary, pj, isouter)
1859 onclause = sj
1860 else:
1861 onclause = pj
1862
1863 self._target_adapter = target_adapter
1864
1865 # we don't use the normal coercions logic for _ORMJoin
1866 # (probably should), so do some gymnastics to get the entity.
1867 # logic here is for #8721, which was a major bug in 1.4
1868 # for almost two years, not reported/fixed until 1.4.43 (!)
1869 if is_selectable(left_info):
1870 parententity = left_selectable._annotations.get(
1871 "parententity", None
1872 )
1873 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1874 parententity = left_info
1875 else:
1876 parententity = None
1877
1878 if parententity is not None:
1879 self._annotations = self._annotations.union(
1880 {"parententity": parententity}
1881 )
1882
1883 augment_onclause = bool(_extra_criteria) and not prop
1884 expression.Join.__init__(self, left, right, onclause, isouter, full)
1885
1886 assert self.onclause is not None
1887
1888 if augment_onclause:
1889 self.onclause &= sql.and_(*_extra_criteria)
1890
1891 if (
1892 not prop
1893 and getattr(right_info, "mapper", None)
1894 and right_info.mapper.single # type: ignore
1895 ):
1896 right_info = cast("_InternalEntityType[Any]", right_info)
1897 # if single inheritance target and we are using a manual
1898 # or implicit ON clause, augment it the same way we'd augment the
1899 # WHERE.
1900 single_crit = right_info.mapper._single_table_criterion
1901 if single_crit is not None:
1902 if insp_is_aliased_class(right_info):
1903 single_crit = right_info._adapter.traverse(single_crit)
1904 self.onclause = self.onclause & single_crit
1905
1906 def _splice_into_center(self, other):
1907 """Splice a join into the center.
1908
1909 Given join(a, b) and join(b, c), return join(a, b).join(c)
1910
1911 """
1912 leftmost = other
1913 while isinstance(leftmost, sql.Join):
1914 leftmost = leftmost.left
1915
1916 assert self.right is leftmost
1917
1918 left = _ORMJoin(
1919 self.left,
1920 other.left,
1921 self.onclause,
1922 isouter=self.isouter,
1923 _left_memo=self._left_memo,
1924 _right_memo=other._left_memo._path_registry,
1925 )
1926
1927 return _ORMJoin(
1928 left,
1929 other.right,
1930 other.onclause,
1931 isouter=other.isouter,
1932 _right_memo=other._right_memo,
1933 )
1934
1935 def join(
1936 self,
1937 right: _FromClauseArgument,
1938 onclause: Optional[_OnClauseArgument] = None,
1939 isouter: bool = False,
1940 full: bool = False,
1941 ) -> _ORMJoin:
1942 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1943
1944 def outerjoin(
1945 self,
1946 right: _FromClauseArgument,
1947 onclause: Optional[_OnClauseArgument] = None,
1948 full: bool = False,
1949 ) -> _ORMJoin:
1950 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1951
1952
1953def with_parent(
1954 instance: object,
1955 prop: attributes.QueryableAttribute[Any],
1956 from_entity: Optional[_EntityType[Any]] = None,
1957) -> ColumnElement[bool]:
1958 """Create filtering criterion that relates this query's primary entity
1959 to the given related instance, using established
1960 :func:`_orm.relationship()`
1961 configuration.
1962
1963 E.g.::
1964
1965 stmt = select(Address).where(with_parent(some_user, User.addresses))
1966
1967 The SQL rendered is the same as that rendered when a lazy loader
1968 would fire off from the given parent on that attribute, meaning
1969 that the appropriate state is taken from the parent object in
1970 Python without the need to render joins to the parent table
1971 in the rendered statement.
1972
1973 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1974 to indicate the left side of the criteria::
1975
1976
1977 a1 = aliased(Address)
1978 a2 = aliased(Address)
1979 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
1980
1981 The above use is equivalent to using the
1982 :func:`_orm.with_parent.from_entity` argument::
1983
1984 a1 = aliased(Address)
1985 a2 = aliased(Address)
1986 stmt = select(a1, a2).where(
1987 with_parent(u1, User.addresses, from_entity=a2)
1988 )
1989
1990 :param instance:
1991 An instance which has some :func:`_orm.relationship`.
1992
1993 :param property:
1994 Class-bound attribute, which indicates
1995 what relationship from the instance should be used to reconcile the
1996 parent/child relationship.
1997
1998 :param from_entity:
1999 Entity in which to consider as the left side. This defaults to the
2000 "zero" entity of the :class:`_query.Query` itself.
2001
2002 .. versionadded:: 1.2
2003
2004 """ # noqa: E501
2005 prop_t: RelationshipProperty[Any]
2006
2007 if isinstance(prop, str):
2008 raise sa_exc.ArgumentError(
2009 "with_parent() accepts class-bound mapped attributes, not strings"
2010 )
2011 elif isinstance(prop, attributes.QueryableAttribute):
2012 if prop._of_type:
2013 from_entity = prop._of_type
2014 mapper_property = prop.property
2015 if mapper_property is None or not prop_is_relationship(
2016 mapper_property
2017 ):
2018 raise sa_exc.ArgumentError(
2019 f"Expected relationship property for with_parent(), "
2020 f"got {mapper_property}"
2021 )
2022 prop_t = mapper_property
2023 else:
2024 prop_t = prop
2025
2026 return prop_t._with_parent(instance, from_entity=from_entity)
2027
2028
2029def has_identity(object_: object) -> bool:
2030 """Return True if the given object has a database
2031 identity.
2032
2033 This typically corresponds to the object being
2034 in either the persistent or detached state.
2035
2036 .. seealso::
2037
2038 :func:`.was_deleted`
2039
2040 """
2041 state = attributes.instance_state(object_)
2042 return state.has_identity
2043
2044
2045def was_deleted(object_: object) -> bool:
2046 """Return True if the given object was deleted
2047 within a session flush.
2048
2049 This is regardless of whether or not the object is
2050 persistent or detached.
2051
2052 .. seealso::
2053
2054 :attr:`.InstanceState.was_deleted`
2055
2056 """
2057
2058 state = attributes.instance_state(object_)
2059 return state.was_deleted
2060
2061
2062def _entity_corresponds_to(
2063 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2064) -> bool:
2065 """determine if 'given' corresponds to 'entity', in terms
2066 of an entity passed to Query that would match the same entity
2067 being referred to elsewhere in the query.
2068
2069 """
2070 if insp_is_aliased_class(entity):
2071 if insp_is_aliased_class(given):
2072 if entity._base_alias() is given._base_alias():
2073 return True
2074 return False
2075 elif insp_is_aliased_class(given):
2076 if given._use_mapper_path:
2077 return entity in given.with_polymorphic_mappers
2078 else:
2079 return entity is given
2080
2081 assert insp_is_mapper(given)
2082 return entity.common_parent(given)
2083
2084
2085def _entity_corresponds_to_use_path_impl(
2086 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2087) -> bool:
2088 """determine if 'given' corresponds to 'entity', in terms
2089 of a path of loader options where a mapped attribute is taken to
2090 be a member of a parent entity.
2091
2092 e.g.::
2093
2094 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2095 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2096
2097 a1 = aliased(A)
2098 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2099 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2100
2101 wp = with_polymorphic(A, [A1, A2])
2102 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2103 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2104
2105 """
2106 if insp_is_aliased_class(given):
2107 return (
2108 insp_is_aliased_class(entity)
2109 and not entity._use_mapper_path
2110 and (given is entity or entity in given._with_polymorphic_entities)
2111 )
2112 elif not insp_is_aliased_class(entity):
2113 return given.isa(entity.mapper)
2114 else:
2115 return (
2116 entity._use_mapper_path
2117 and given in entity.with_polymorphic_mappers
2118 )
2119
2120
2121def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2122 """determine if 'given' "is a" mapper, in terms of the given
2123 would load rows of type 'mapper'.
2124
2125 """
2126 if given.is_aliased_class:
2127 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2128 mapper
2129 )
2130 elif given.with_polymorphic_mappers:
2131 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2132 else:
2133 return given.isa(mapper)
2134
2135
2136def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2137 """calculate __getitem__ in terms of an iterable query object
2138 that also has a slice() method.
2139
2140 """
2141
2142 def _no_negative_indexes():
2143 raise IndexError(
2144 "negative indexes are not accepted by SQL "
2145 "index / slice operators"
2146 )
2147
2148 if isinstance(item, slice):
2149 start, stop, step = util.decode_slice(item)
2150
2151 if (
2152 isinstance(stop, int)
2153 and isinstance(start, int)
2154 and stop - start <= 0
2155 ):
2156 return []
2157
2158 elif (isinstance(start, int) and start < 0) or (
2159 isinstance(stop, int) and stop < 0
2160 ):
2161 _no_negative_indexes()
2162
2163 res = iterable_query.slice(start, stop)
2164 if step is not None:
2165 return list(res)[None : None : item.step]
2166 else:
2167 return list(res)
2168 else:
2169 if item == -1:
2170 _no_negative_indexes()
2171 else:
2172 return list(iterable_query[item : item + 1])[0]
2173
2174
2175def _is_mapped_annotation(
2176 raw_annotation: _AnnotationScanType,
2177 cls: Type[Any],
2178 originating_cls: Type[Any],
2179) -> bool:
2180 try:
2181 annotated = de_stringify_annotation(
2182 cls, raw_annotation, originating_cls.__module__
2183 )
2184 except NameError:
2185 # in most cases, at least within our own tests, we can raise
2186 # here, which is more accurate as it prevents us from returning
2187 # false negatives. However, in the real world, try to avoid getting
2188 # involved with end-user annotations that have nothing to do with us.
2189 # see issue #8888 where we bypass using this function in the case
2190 # that we want to detect an unresolvable Mapped[] type.
2191 return False
2192 else:
2193 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2194
2195
2196class _CleanupError(Exception):
2197 pass
2198
2199
2200def _cleanup_mapped_str_annotation(
2201 annotation: str, originating_module: str
2202) -> str:
2203 # fix up an annotation that comes in as the form:
2204 # 'Mapped[List[Address]]' so that it instead looks like:
2205 # 'Mapped[List["Address"]]' , which will allow us to get
2206 # "Address" as a string
2207
2208 # additionally, resolve symbols for these names since this is where
2209 # we'd have to do it
2210
2211 inner: Optional[Match[str]]
2212
2213 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
2214
2215 if not mm:
2216 return annotation
2217
2218 # ticket #8759. Resolve the Mapped name to a real symbol.
2219 # originally this just checked the name.
2220 try:
2221 obj = eval_name_only(mm.group(1), originating_module)
2222 except NameError as ne:
2223 raise _CleanupError(
2224 f'For annotation "{annotation}", could not resolve '
2225 f'container type "{mm.group(1)}". '
2226 "Please ensure this type is imported at the module level "
2227 "outside of TYPE_CHECKING blocks"
2228 ) from ne
2229
2230 if obj is typing.ClassVar:
2231 real_symbol = "ClassVar"
2232 else:
2233 try:
2234 if issubclass(obj, _MappedAnnotationBase):
2235 real_symbol = obj.__name__
2236 else:
2237 return annotation
2238 except TypeError:
2239 # avoid isinstance(obj, type) check, just catch TypeError
2240 return annotation
2241
2242 # note: if one of the codepaths above didn't define real_symbol and
2243 # then didn't return, real_symbol raises UnboundLocalError
2244 # which is actually a NameError, and the calling routines don't
2245 # notice this since they are catching NameError anyway. Just in case
2246 # this is being modified in the future, something to be aware of.
2247
2248 stack = []
2249 inner = mm
2250 while True:
2251 stack.append(real_symbol if mm is inner else inner.group(1))
2252 g2 = inner.group(2)
2253 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
2254 if inner is None:
2255 stack.append(g2)
2256 break
2257
2258 # stacks we want to rewrite, that is, quote the last entry which
2259 # we think is a relationship class name:
2260 #
2261 # ['Mapped', 'List', 'Address']
2262 # ['Mapped', 'A']
2263 #
2264 # stacks we dont want to rewrite, which are generally MappedColumn
2265 # use cases:
2266 #
2267 # ['Mapped', "'Optional[Dict[str, str]]'"]
2268 # ['Mapped', 'dict[str, str] | None']
2269
2270 if (
2271 # avoid already quoted symbols such as
2272 # ['Mapped', "'Optional[Dict[str, str]]'"]
2273 not re.match(r"""^["'].*["']$""", stack[-1])
2274 # avoid further generics like Dict[] such as
2275 # ['Mapped', 'dict[str, str] | None'],
2276 # ['Mapped', 'list[int] | list[str]'],
2277 # ['Mapped', 'Union[list[int], list[str]]'],
2278 and not re.search(r"[\[\]]", stack[-1])
2279 ):
2280 stripchars = "\"' "
2281 stack[-1] = ", ".join(
2282 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2283 )
2284
2285 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2286
2287 return annotation
2288
2289
2290def _extract_mapped_subtype(
2291 raw_annotation: Optional[_AnnotationScanType],
2292 cls: type,
2293 originating_module: str,
2294 key: str,
2295 attr_cls: Type[Any],
2296 required: bool,
2297 is_dataclass_field: bool,
2298 expect_mapped: bool = True,
2299 raiseerr: bool = True,
2300) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2301 """given an annotation, figure out if it's ``Mapped[something]`` and if
2302 so, return the ``something`` part.
2303
2304 Includes error raise scenarios and other options.
2305
2306 """
2307
2308 if raw_annotation is None:
2309 if required:
2310 raise orm_exc.MappedAnnotationError(
2311 f"Python typing annotation is required for attribute "
2312 f'"{cls.__name__}.{key}" when primary argument(s) for '
2313 f'"{attr_cls.__name__}" construct are None or not present'
2314 )
2315 return None
2316
2317 try:
2318 # destringify the "outside" of the annotation. note we are not
2319 # adding include_generic so it will *not* dig into generic contents,
2320 # which will remain as ForwardRef or plain str under future annotations
2321 # mode. The full destringify happens later when mapped_column goes
2322 # to do a full lookup in the registry type_annotations_map.
2323 annotated = de_stringify_annotation(
2324 cls,
2325 raw_annotation,
2326 originating_module,
2327 str_cleanup_fn=_cleanup_mapped_str_annotation,
2328 )
2329 except _CleanupError as ce:
2330 raise orm_exc.MappedAnnotationError(
2331 f"Could not interpret annotation {raw_annotation}. "
2332 "Check that it uses names that are correctly imported at the "
2333 "module level. See chained stack trace for more hints."
2334 ) from ce
2335 except NameError as ne:
2336 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2337 raise orm_exc.MappedAnnotationError(
2338 f"Could not interpret annotation {raw_annotation}. "
2339 "Check that it uses names that are correctly imported at the "
2340 "module level. See chained stack trace for more hints."
2341 ) from ne
2342
2343 annotated = raw_annotation # type: ignore
2344
2345 if is_dataclass_field:
2346 return annotated, None
2347 else:
2348 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2349 annotated, _MappedAnnotationBase
2350 ):
2351 if expect_mapped:
2352 if not raiseerr:
2353 return None
2354
2355 origin = getattr(annotated, "__origin__", None)
2356 if origin is typing.ClassVar:
2357 return None
2358
2359 # check for other kind of ORM descriptor like AssociationProxy,
2360 # don't raise for that (issue #9957)
2361 elif isinstance(origin, type) and issubclass(
2362 origin, ORMDescriptor
2363 ):
2364 return None
2365
2366 raise orm_exc.MappedAnnotationError(
2367 f'Type annotation for "{cls.__name__}.{key}" '
2368 "can't be correctly interpreted for "
2369 "Annotated Declarative Table form. ORM annotations "
2370 "should normally make use of the ``Mapped[]`` generic "
2371 "type, or other ORM-compatible generic type, as a "
2372 "container for the actual type, which indicates the "
2373 "intent that the attribute is mapped. "
2374 "Class variables that are not intended to be mapped "
2375 "by the ORM should use ClassVar[]. "
2376 "To allow Annotated Declarative to disregard legacy "
2377 "annotations which don't use Mapped[] to pass, set "
2378 '"__allow_unmapped__ = True" on the class or a '
2379 "superclass this class.",
2380 code="zlpr",
2381 )
2382
2383 else:
2384 return annotated, None
2385
2386 if len(annotated.__args__) != 1:
2387 raise orm_exc.MappedAnnotationError(
2388 "Expected sub-type for Mapped[] annotation"
2389 )
2390
2391 return (
2392 # fix dict/list/set args to be ForwardRef, see #11814
2393 fixup_container_fwd_refs(annotated.__args__[0]),
2394 annotated.__origin__,
2395 )
2396
2397
2398def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2399 if hasattr(prop, "_mapper_property_name"):
2400 name = prop._mapper_property_name()
2401 else:
2402 name = None
2403 return util.clsname_as_plain_name(prop, name)