1# orm/util.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Protocol
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35import weakref
36
37from . import attributes # noqa
38from . import exc
39from ._typing import _O
40from ._typing import insp_is_aliased_class
41from ._typing import insp_is_mapper
42from ._typing import prop_is_relationship
43from .base import _class_to_mapper as _class_to_mapper
44from .base import _MappedAnnotationBase
45from .base import _never_set as _never_set # noqa: F401
46from .base import _none_set as _none_set # noqa: F401
47from .base import attribute_str as attribute_str # noqa: F401
48from .base import class_mapper as class_mapper
49from .base import DynamicMapped
50from .base import InspectionAttr as InspectionAttr
51from .base import instance_str as instance_str # noqa: F401
52from .base import Mapped
53from .base import object_mapper as object_mapper
54from .base import object_state as object_state # noqa: F401
55from .base import opt_manager_of_class
56from .base import ORMDescriptor
57from .base import state_attribute_str as state_attribute_str # noqa: F401
58from .base import state_class_str as state_class_str # noqa: F401
59from .base import state_str as state_str # noqa: F401
60from .base import WriteOnlyMapped
61from .interfaces import CriteriaOption
62from .interfaces import MapperProperty as MapperProperty
63from .interfaces import ORMColumnsClauseRole
64from .interfaces import ORMEntityColumnsClauseRole
65from .interfaces import ORMFromClauseRole
66from .path_registry import PathRegistry as PathRegistry
67from .. import event
68from .. import exc as sa_exc
69from .. import inspection
70from .. import sql
71from .. import util
72from ..engine.result import result_tuple
73from ..sql import coercions
74from ..sql import expression
75from ..sql import lambdas
76from ..sql import roles
77from ..sql import util as sql_util
78from ..sql import visitors
79from ..sql._typing import is_selectable
80from ..sql.annotation import SupportsCloneAnnotations
81from ..sql.base import ColumnCollection
82from ..sql.cache_key import HasCacheKey
83from ..sql.cache_key import MemoizedHasCacheKey
84from ..sql.elements import ColumnElement
85from ..sql.elements import KeyedColumnElement
86from ..sql.selectable import FromClause
87from ..util.langhelpers import MemoizedSlots
88from ..util.typing import de_stringify_annotation as _de_stringify_annotation
89from ..util.typing import (
90 de_stringify_union_elements as _de_stringify_union_elements,
91)
92from ..util.typing import eval_name_only as _eval_name_only
93from ..util.typing import fixup_container_fwd_refs
94from ..util.typing import is_origin_of_cls
95from ..util.typing import Literal
96from ..util.typing import TupleAny
97from ..util.typing import typing_get_origin
98from ..util.typing import Unpack
99
100if typing.TYPE_CHECKING:
101 from ._typing import _EntityType
102 from ._typing import _IdentityKeyType
103 from ._typing import _InternalEntityType
104 from ._typing import _ORMCOLEXPR
105 from .context import _MapperEntity
106 from .context import ORMCompileState
107 from .mapper import Mapper
108 from .path_registry import AbstractEntityRegistry
109 from .query import Query
110 from .relationships import RelationshipProperty
111 from ..engine import Row
112 from ..engine import RowMapping
113 from ..sql._typing import _CE
114 from ..sql._typing import _ColumnExpressionArgument
115 from ..sql._typing import _EquivalentColumnMap
116 from ..sql._typing import _FromClauseArgument
117 from ..sql._typing import _OnClauseArgument
118 from ..sql._typing import _PropagateAttrsType
119 from ..sql.annotation import _SA
120 from ..sql.base import ReadOnlyColumnCollection
121 from ..sql.elements import BindParameter
122 from ..sql.selectable import _ColumnsClauseElement
123 from ..sql.selectable import Select
124 from ..sql.selectable import Selectable
125 from ..sql.visitors import anon_map
126 from ..util.typing import _AnnotationScanType
127 from ..util.typing import ArgsTypeProcotol
128
129_T = TypeVar("_T", bound=Any)
130
131all_cascades = frozenset(
132 (
133 "delete",
134 "delete-orphan",
135 "all",
136 "merge",
137 "expunge",
138 "save-update",
139 "refresh-expire",
140 "none",
141 )
142)
143
144
145_de_stringify_partial = functools.partial(
146 functools.partial,
147 locals_=util.immutabledict(
148 {
149 "Mapped": Mapped,
150 "WriteOnlyMapped": WriteOnlyMapped,
151 "DynamicMapped": DynamicMapped,
152 }
153 ),
154)
155
156# partial is practically useless as we have to write out the whole
157# function and maintain the signature anyway
158
159
160class _DeStringifyAnnotation(Protocol):
161 def __call__(
162 self,
163 cls: Type[Any],
164 annotation: _AnnotationScanType,
165 originating_module: str,
166 *,
167 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
168 include_generic: bool = False,
169 ) -> Type[Any]: ...
170
171
172de_stringify_annotation = cast(
173 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
174)
175
176
177class _DeStringifyUnionElements(Protocol):
178 def __call__(
179 self,
180 cls: Type[Any],
181 annotation: ArgsTypeProcotol,
182 originating_module: str,
183 *,
184 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
185 ) -> Type[Any]: ...
186
187
188de_stringify_union_elements = cast(
189 _DeStringifyUnionElements,
190 _de_stringify_partial(_de_stringify_union_elements),
191)
192
193
194class _EvalNameOnly(Protocol):
195 def __call__(self, name: str, module_name: str) -> Any: ...
196
197
198eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
199
200
201class CascadeOptions(FrozenSet[str]):
202 """Keeps track of the options sent to
203 :paramref:`.relationship.cascade`"""
204
205 _add_w_all_cascades = all_cascades.difference(
206 ["all", "none", "delete-orphan"]
207 )
208 _allowed_cascades = all_cascades
209
210 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
211
212 __slots__ = (
213 "save_update",
214 "delete",
215 "refresh_expire",
216 "merge",
217 "expunge",
218 "delete_orphan",
219 )
220
221 save_update: bool
222 delete: bool
223 refresh_expire: bool
224 merge: bool
225 expunge: bool
226 delete_orphan: bool
227
228 def __new__(
229 cls, value_list: Optional[Union[Iterable[str], str]]
230 ) -> CascadeOptions:
231 if isinstance(value_list, str) or value_list is None:
232 return cls.from_string(value_list) # type: ignore
233 values = set(value_list)
234 if values.difference(cls._allowed_cascades):
235 raise sa_exc.ArgumentError(
236 "Invalid cascade option(s): %s"
237 % ", ".join(
238 [
239 repr(x)
240 for x in sorted(
241 values.difference(cls._allowed_cascades)
242 )
243 ]
244 )
245 )
246
247 if "all" in values:
248 values.update(cls._add_w_all_cascades)
249 if "none" in values:
250 values.clear()
251 values.discard("all")
252
253 self = super().__new__(cls, values)
254 self.save_update = "save-update" in values
255 self.delete = "delete" in values
256 self.refresh_expire = "refresh-expire" in values
257 self.merge = "merge" in values
258 self.expunge = "expunge" in values
259 self.delete_orphan = "delete-orphan" in values
260
261 if self.delete_orphan and not self.delete:
262 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
263 return self
264
265 def __repr__(self):
266 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
267
268 @classmethod
269 def from_string(cls, arg):
270 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
271 return cls(values)
272
273
274def _validator_events(desc, key, validator, include_removes, include_backrefs):
275 """Runs a validation method on an attribute value to be set or
276 appended.
277 """
278
279 if not include_backrefs:
280
281 def detect_is_backref(state, initiator):
282 impl = state.manager[key].impl
283 return initiator.impl is not impl
284
285 if include_removes:
286
287 def append(state, value, initiator):
288 if initiator.op is not attributes.OP_BULK_REPLACE and (
289 include_backrefs or not detect_is_backref(state, initiator)
290 ):
291 return validator(state.obj(), key, value, False)
292 else:
293 return value
294
295 def bulk_set(state, values, initiator):
296 if include_backrefs or not detect_is_backref(state, initiator):
297 obj = state.obj()
298 values[:] = [
299 validator(obj, key, value, False) for value in values
300 ]
301
302 def set_(state, value, oldvalue, initiator):
303 if include_backrefs or not detect_is_backref(state, initiator):
304 return validator(state.obj(), key, value, False)
305 else:
306 return value
307
308 def remove(state, value, initiator):
309 if include_backrefs or not detect_is_backref(state, initiator):
310 validator(state.obj(), key, value, True)
311
312 else:
313
314 def append(state, value, initiator):
315 if initiator.op is not attributes.OP_BULK_REPLACE and (
316 include_backrefs or not detect_is_backref(state, initiator)
317 ):
318 return validator(state.obj(), key, value)
319 else:
320 return value
321
322 def bulk_set(state, values, initiator):
323 if include_backrefs or not detect_is_backref(state, initiator):
324 obj = state.obj()
325 values[:] = [validator(obj, key, value) for value in values]
326
327 def set_(state, value, oldvalue, initiator):
328 if include_backrefs or not detect_is_backref(state, initiator):
329 return validator(state.obj(), key, value)
330 else:
331 return value
332
333 event.listen(desc, "append", append, raw=True, retval=True)
334 event.listen(desc, "bulk_replace", bulk_set, raw=True)
335 event.listen(desc, "set", set_, raw=True, retval=True)
336 if include_removes:
337 event.listen(desc, "remove", remove, raw=True, retval=True)
338
339
340def polymorphic_union(
341 table_map, typecolname, aliasname="p_union", cast_nulls=True
342):
343 """Create a ``UNION`` statement used by a polymorphic mapper.
344
345 See :ref:`concrete_inheritance` for an example of how
346 this is used.
347
348 :param table_map: mapping of polymorphic identities to
349 :class:`_schema.Table` objects.
350 :param typecolname: string name of a "discriminator" column, which will be
351 derived from the query, producing the polymorphic identity for
352 each row. If ``None``, no polymorphic discriminator is generated.
353 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
354 construct generated.
355 :param cast_nulls: if True, non-existent columns, which are represented
356 as labeled NULLs, will be passed into CAST. This is a legacy behavior
357 that is problematic on some backends such as Oracle - in which case it
358 can be set to False.
359
360 """
361
362 colnames: util.OrderedSet[str] = util.OrderedSet()
363 colnamemaps = {}
364 types = {}
365 for key in table_map:
366 table = table_map[key]
367
368 table = coercions.expect(
369 roles.StrictFromClauseRole, table, allow_select=True
370 )
371 table_map[key] = table
372
373 m = {}
374 for c in table.c:
375 if c.key == typecolname:
376 raise sa_exc.InvalidRequestError(
377 "Polymorphic union can't use '%s' as the discriminator "
378 "column due to mapped column %r; please apply the "
379 "'typecolname' "
380 "argument; this is available on "
381 "ConcreteBase as '_concrete_discriminator_name'"
382 % (typecolname, c)
383 )
384 colnames.add(c.key)
385 m[c.key] = c
386 types[c.key] = c.type
387 colnamemaps[table] = m
388
389 def col(name, table):
390 try:
391 return colnamemaps[table][name]
392 except KeyError:
393 if cast_nulls:
394 return sql.cast(sql.null(), types[name]).label(name)
395 else:
396 return sql.type_coerce(sql.null(), types[name]).label(name)
397
398 result = []
399 for type_, table in table_map.items():
400 if typecolname is not None:
401 result.append(
402 sql.select(
403 *(
404 [col(name, table) for name in colnames]
405 + [
406 sql.literal_column(
407 sql_util._quote_ddl_expr(type_)
408 ).label(typecolname)
409 ]
410 )
411 ).select_from(table)
412 )
413 else:
414 result.append(
415 sql.select(
416 *[col(name, table) for name in colnames]
417 ).select_from(table)
418 )
419 return sql.union_all(*result).alias(aliasname)
420
421
422def identity_key(
423 class_: Optional[Type[_T]] = None,
424 ident: Union[Any, Tuple[Any, ...]] = None,
425 *,
426 instance: Optional[_T] = None,
427 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
428 identity_token: Optional[Any] = None,
429) -> _IdentityKeyType[_T]:
430 r"""Generate "identity key" tuples, as are used as keys in the
431 :attr:`.Session.identity_map` dictionary.
432
433 This function has several call styles:
434
435 * ``identity_key(class, ident, identity_token=token)``
436
437 This form receives a mapped class and a primary key scalar or
438 tuple as an argument.
439
440 E.g.::
441
442 >>> identity_key(MyClass, (1, 2))
443 (<class '__main__.MyClass'>, (1, 2), None)
444
445 :param class: mapped class (must be a positional argument)
446 :param ident: primary key, may be a scalar or tuple argument.
447 :param identity_token: optional identity token
448
449 .. versionadded:: 1.2 added identity_token
450
451
452 * ``identity_key(instance=instance)``
453
454 This form will produce the identity key for a given instance. The
455 instance need not be persistent, only that its primary key attributes
456 are populated (else the key will contain ``None`` for those missing
457 values).
458
459 E.g.::
460
461 >>> instance = MyClass(1, 2)
462 >>> identity_key(instance=instance)
463 (<class '__main__.MyClass'>, (1, 2), None)
464
465 In this form, the given instance is ultimately run though
466 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
467 effect of performing a database check for the corresponding row
468 if the object is expired.
469
470 :param instance: object instance (must be given as a keyword arg)
471
472 * ``identity_key(class, row=row, identity_token=token)``
473
474 This form is similar to the class/tuple form, except is passed a
475 database result row as a :class:`.Row` or :class:`.RowMapping` object.
476
477 E.g.::
478
479 >>> row = engine.execute(\
480 text("select * from table where a=1 and b=2")\
481 ).first()
482 >>> identity_key(MyClass, row=row)
483 (<class '__main__.MyClass'>, (1, 2), None)
484
485 :param class: mapped class (must be a positional argument)
486 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
487 (must be given as a keyword arg)
488 :param identity_token: optional identity token
489
490 .. versionadded:: 1.2 added identity_token
491
492 """
493 if class_ is not None:
494 mapper = class_mapper(class_)
495 if row is None:
496 if ident is None:
497 raise sa_exc.ArgumentError("ident or row is required")
498 return mapper.identity_key_from_primary_key(
499 tuple(util.to_list(ident)), identity_token=identity_token
500 )
501 else:
502 return mapper.identity_key_from_row(
503 row, identity_token=identity_token
504 )
505 elif instance is not None:
506 mapper = object_mapper(instance)
507 return mapper.identity_key_from_instance(instance)
508 else:
509 raise sa_exc.ArgumentError("class or instance is required")
510
511
512class _TraceAdaptRole(enum.Enum):
513 """Enumeration of all the use cases for ORMAdapter.
514
515 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
516 used for in-place adaption of column expressions to be applied to a SELECT,
517 replacing :class:`.Table` and other objects that are mapped to classes with
518 aliases of those tables in the case of joined eager loading, or in the case
519 of polymorphic loading as used with concrete mappings or other custom "with
520 polymorphic" parameters, with whole user-defined subqueries. The
521 enumerations provide an overview of all the use cases used by ORMAdapter, a
522 layer of formality as to the introduction of new ORMAdapter use cases (of
523 which none are anticipated), as well as a means to trace the origins of a
524 particular ORMAdapter within runtime debugging.
525
526 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
527 open-ended statement adaption, including the ``Query.with_polymorphic()``
528 method and the ``Query.select_from_entity()`` methods, favoring
529 user-explicit aliasing schemes using the ``aliased()`` and
530 ``with_polymorphic()`` standalone constructs; these still use adaption,
531 however the adaption is applied in a narrower scope.
532
533 """
534
535 # aliased() use that is used to adapt individual attributes at query
536 # construction time
537 ALIASED_INSP = enum.auto()
538
539 # joinedload cases; typically adapt an ON clause of a relationship
540 # join
541 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
542 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
543 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
544
545 # polymorphic cases - these are complex ones that replace FROM
546 # clauses, replacing tables with subqueries
547 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
548 WITH_POLYMORPHIC_ADAPTER = enum.auto()
549 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
550 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
551
552 # the from_statement() case, used only to adapt individual attributes
553 # from a given statement to local ORM attributes at result fetching
554 # time. assigned to ORMCompileState._from_obj_alias
555 ADAPT_FROM_STATEMENT = enum.auto()
556
557 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
558 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
559 # joinedloads are then placed on the outside.
560 # assigned to ORMCompileState.compound_eager_adapter
561 COMPOUND_EAGER_STATEMENT = enum.auto()
562
563 # the legacy Query._set_select_from() case.
564 # this is needed for Query's set operations (i.e. UNION, etc. )
565 # as well as "legacy from_self()", which while removed from 2.0 as
566 # public API, is used for the Query.count() method. this one
567 # still does full statement traversal
568 # assigned to ORMCompileState._from_obj_alias
569 LEGACY_SELECT_FROM_ALIAS = enum.auto()
570
571
572class ORMStatementAdapter(sql_util.ColumnAdapter):
573 """ColumnAdapter which includes a role attribute."""
574
575 __slots__ = ("role",)
576
577 def __init__(
578 self,
579 role: _TraceAdaptRole,
580 selectable: Selectable,
581 *,
582 equivalents: Optional[_EquivalentColumnMap] = None,
583 adapt_required: bool = False,
584 allow_label_resolve: bool = True,
585 anonymize_labels: bool = False,
586 adapt_on_names: bool = False,
587 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
588 ):
589 self.role = role
590 super().__init__(
591 selectable,
592 equivalents=equivalents,
593 adapt_required=adapt_required,
594 allow_label_resolve=allow_label_resolve,
595 anonymize_labels=anonymize_labels,
596 adapt_on_names=adapt_on_names,
597 adapt_from_selectables=adapt_from_selectables,
598 )
599
600
601class ORMAdapter(sql_util.ColumnAdapter):
602 """ColumnAdapter subclass which excludes adaptation of entities from
603 non-matching mappers.
604
605 """
606
607 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
608
609 is_aliased_class: bool
610 aliased_insp: Optional[AliasedInsp[Any]]
611
612 def __init__(
613 self,
614 role: _TraceAdaptRole,
615 entity: _InternalEntityType[Any],
616 *,
617 equivalents: Optional[_EquivalentColumnMap] = None,
618 adapt_required: bool = False,
619 allow_label_resolve: bool = True,
620 anonymize_labels: bool = False,
621 selectable: Optional[Selectable] = None,
622 limit_on_entity: bool = True,
623 adapt_on_names: bool = False,
624 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
625 ):
626 self.role = role
627 self.mapper = entity.mapper
628 if selectable is None:
629 selectable = entity.selectable
630 if insp_is_aliased_class(entity):
631 self.is_aliased_class = True
632 self.aliased_insp = entity
633 else:
634 self.is_aliased_class = False
635 self.aliased_insp = None
636
637 super().__init__(
638 selectable,
639 equivalents,
640 adapt_required=adapt_required,
641 allow_label_resolve=allow_label_resolve,
642 anonymize_labels=anonymize_labels,
643 include_fn=self._include_fn if limit_on_entity else None,
644 adapt_on_names=adapt_on_names,
645 adapt_from_selectables=adapt_from_selectables,
646 )
647
648 def _include_fn(self, elem):
649 entity = elem._annotations.get("parentmapper", None)
650
651 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
652
653
654class AliasedClass(
655 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
656):
657 r"""Represents an "aliased" form of a mapped class for usage with Query.
658
659 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
660 construct, this object mimics the mapped class using a
661 ``__getattr__`` scheme and maintains a reference to a
662 real :class:`~sqlalchemy.sql.expression.Alias` object.
663
664 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
665 within a SQL statement generated by the ORM, such that an existing
666 mapped entity can be used in multiple contexts. A simple example::
667
668 # find all pairs of users with the same name
669 user_alias = aliased(User)
670 session.query(User, user_alias).\
671 join((user_alias, User.id > user_alias.id)).\
672 filter(User.name == user_alias.name)
673
674 :class:`.AliasedClass` is also capable of mapping an existing mapped
675 class to an entirely new selectable, provided this selectable is column-
676 compatible with the existing mapped selectable, and it can also be
677 configured in a mapping as the target of a :func:`_orm.relationship`.
678 See the links below for examples.
679
680 The :class:`.AliasedClass` object is constructed typically using the
681 :func:`_orm.aliased` function. It also is produced with additional
682 configuration when using the :func:`_orm.with_polymorphic` function.
683
684 The resulting object is an instance of :class:`.AliasedClass`.
685 This object implements an attribute scheme which produces the
686 same attribute and method interface as the original mapped
687 class, allowing :class:`.AliasedClass` to be compatible
688 with any attribute technique which works on the original class,
689 including hybrid attributes (see :ref:`hybrids_toplevel`).
690
691 The :class:`.AliasedClass` can be inspected for its underlying
692 :class:`_orm.Mapper`, aliased selectable, and other information
693 using :func:`_sa.inspect`::
694
695 from sqlalchemy import inspect
696 my_alias = aliased(MyClass)
697 insp = inspect(my_alias)
698
699 The resulting inspection object is an instance of :class:`.AliasedInsp`.
700
701
702 .. seealso::
703
704 :func:`.aliased`
705
706 :func:`.with_polymorphic`
707
708 :ref:`relationship_aliased_class`
709
710 :ref:`relationship_to_window_function`
711
712
713 """
714
715 __name__: str
716
717 def __init__(
718 self,
719 mapped_class_or_ac: _EntityType[_O],
720 alias: Optional[FromClause] = None,
721 name: Optional[str] = None,
722 flat: bool = False,
723 adapt_on_names: bool = False,
724 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
725 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
726 base_alias: Optional[AliasedInsp[Any]] = None,
727 use_mapper_path: bool = False,
728 represents_outer_join: bool = False,
729 ):
730 insp = cast(
731 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
732 )
733 mapper = insp.mapper
734
735 nest_adapters = False
736
737 if alias is None:
738 if insp.is_aliased_class and insp.selectable._is_subquery:
739 alias = insp.selectable.alias()
740 else:
741 alias = (
742 mapper._with_polymorphic_selectable._anonymous_fromclause(
743 name=name,
744 flat=flat,
745 )
746 )
747 elif insp.is_aliased_class:
748 nest_adapters = True
749
750 assert alias is not None
751 self._aliased_insp = AliasedInsp(
752 self,
753 insp,
754 alias,
755 name,
756 (
757 with_polymorphic_mappers
758 if with_polymorphic_mappers
759 else mapper.with_polymorphic_mappers
760 ),
761 (
762 with_polymorphic_discriminator
763 if with_polymorphic_discriminator is not None
764 else mapper.polymorphic_on
765 ),
766 base_alias,
767 use_mapper_path,
768 adapt_on_names,
769 represents_outer_join,
770 nest_adapters,
771 )
772
773 self.__name__ = f"aliased({mapper.class_.__name__})"
774
775 @classmethod
776 def _reconstitute_from_aliased_insp(
777 cls, aliased_insp: AliasedInsp[_O]
778 ) -> AliasedClass[_O]:
779 obj = cls.__new__(cls)
780 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
781 obj._aliased_insp = aliased_insp
782
783 if aliased_insp._is_with_polymorphic:
784 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
785 if sub_aliased_insp is not aliased_insp:
786 ent = AliasedClass._reconstitute_from_aliased_insp(
787 sub_aliased_insp
788 )
789 setattr(obj, sub_aliased_insp.class_.__name__, ent)
790
791 return obj
792
793 def __getattr__(self, key: str) -> Any:
794 try:
795 _aliased_insp = self.__dict__["_aliased_insp"]
796 except KeyError:
797 raise AttributeError()
798 else:
799 target = _aliased_insp._target
800 # maintain all getattr mechanics
801 attr = getattr(target, key)
802
803 # attribute is a method, that will be invoked against a
804 # "self"; so just return a new method with the same function and
805 # new self
806 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
807 return types.MethodType(attr.__func__, self)
808
809 # attribute is a descriptor, that will be invoked against a
810 # "self"; so invoke the descriptor against this self
811 if hasattr(attr, "__get__"):
812 attr = attr.__get__(None, self)
813
814 # attributes within the QueryableAttribute system will want this
815 # to be invoked so the object can be adapted
816 if hasattr(attr, "adapt_to_entity"):
817 attr = attr.adapt_to_entity(_aliased_insp)
818 setattr(self, key, attr)
819
820 return attr
821
822 def _get_from_serialized(
823 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
824 ) -> Any:
825 # this method is only used in terms of the
826 # sqlalchemy.ext.serializer extension
827 attr = getattr(mapped_class, key)
828 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
829 return types.MethodType(attr.__func__, self)
830
831 # attribute is a descriptor, that will be invoked against a
832 # "self"; so invoke the descriptor against this self
833 if hasattr(attr, "__get__"):
834 attr = attr.__get__(None, self)
835
836 # attributes within the QueryableAttribute system will want this
837 # to be invoked so the object can be adapted
838 if hasattr(attr, "adapt_to_entity"):
839 aliased_insp._weak_entity = weakref.ref(self)
840 attr = attr.adapt_to_entity(aliased_insp)
841 setattr(self, key, attr)
842
843 return attr
844
845 def __repr__(self) -> str:
846 return "<AliasedClass at 0x%x; %s>" % (
847 id(self),
848 self._aliased_insp._target.__name__,
849 )
850
851 def __str__(self) -> str:
852 return str(self._aliased_insp)
853
854
855@inspection._self_inspects
856class AliasedInsp(
857 ORMEntityColumnsClauseRole[_O],
858 ORMFromClauseRole,
859 HasCacheKey,
860 InspectionAttr,
861 MemoizedSlots,
862 inspection.Inspectable["AliasedInsp[_O]"],
863 Generic[_O],
864):
865 """Provide an inspection interface for an
866 :class:`.AliasedClass` object.
867
868 The :class:`.AliasedInsp` object is returned
869 given an :class:`.AliasedClass` using the
870 :func:`_sa.inspect` function::
871
872 from sqlalchemy import inspect
873 from sqlalchemy.orm import aliased
874
875 my_alias = aliased(MyMappedClass)
876 insp = inspect(my_alias)
877
878 Attributes on :class:`.AliasedInsp`
879 include:
880
881 * ``entity`` - the :class:`.AliasedClass` represented.
882 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
883 * ``selectable`` - the :class:`_expression.Alias`
884 construct which ultimately
885 represents an aliased :class:`_schema.Table` or
886 :class:`_expression.Select`
887 construct.
888 * ``name`` - the name of the alias. Also is used as the attribute
889 name when returned in a result tuple from :class:`_query.Query`.
890 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
891 objects
892 indicating all those mappers expressed in the select construct
893 for the :class:`.AliasedClass`.
894 * ``polymorphic_on`` - an alternate column or SQL expression which
895 will be used as the "discriminator" for a polymorphic load.
896
897 .. seealso::
898
899 :ref:`inspection_toplevel`
900
901 """
902
903 __slots__ = (
904 "__weakref__",
905 "_weak_entity",
906 "mapper",
907 "selectable",
908 "name",
909 "_adapt_on_names",
910 "with_polymorphic_mappers",
911 "polymorphic_on",
912 "_use_mapper_path",
913 "_base_alias",
914 "represents_outer_join",
915 "persist_selectable",
916 "local_table",
917 "_is_with_polymorphic",
918 "_with_polymorphic_entities",
919 "_adapter",
920 "_target",
921 "__clause_element__",
922 "_memoized_values",
923 "_all_column_expressions",
924 "_nest_adapters",
925 )
926
927 _cache_key_traversal = [
928 ("name", visitors.ExtendedInternalTraversal.dp_string),
929 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
930 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
931 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
932 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
933 (
934 "with_polymorphic_mappers",
935 visitors.InternalTraversal.dp_has_cache_key_list,
936 ),
937 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
938 ]
939
940 mapper: Mapper[_O]
941 selectable: FromClause
942 _adapter: ORMAdapter
943 with_polymorphic_mappers: Sequence[Mapper[Any]]
944 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
945
946 _weak_entity: weakref.ref[AliasedClass[_O]]
947 """the AliasedClass that refers to this AliasedInsp"""
948
949 _target: Union[Type[_O], AliasedClass[_O]]
950 """the thing referenced by the AliasedClass/AliasedInsp.
951
952 In the vast majority of cases, this is the mapped class. However
953 it may also be another AliasedClass (alias of alias).
954
955 """
956
957 def __init__(
958 self,
959 entity: AliasedClass[_O],
960 inspected: _InternalEntityType[_O],
961 selectable: FromClause,
962 name: Optional[str],
963 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
964 polymorphic_on: Optional[ColumnElement[Any]],
965 _base_alias: Optional[AliasedInsp[Any]],
966 _use_mapper_path: bool,
967 adapt_on_names: bool,
968 represents_outer_join: bool,
969 nest_adapters: bool,
970 ):
971 mapped_class_or_ac = inspected.entity
972 mapper = inspected.mapper
973
974 self._weak_entity = weakref.ref(entity)
975 self.mapper = mapper
976 self.selectable = self.persist_selectable = self.local_table = (
977 selectable
978 )
979 self.name = name
980 self.polymorphic_on = polymorphic_on
981 self._base_alias = weakref.ref(_base_alias or self)
982 self._use_mapper_path = _use_mapper_path
983 self.represents_outer_join = represents_outer_join
984 self._nest_adapters = nest_adapters
985
986 if with_polymorphic_mappers:
987 self._is_with_polymorphic = True
988 self.with_polymorphic_mappers = with_polymorphic_mappers
989 self._with_polymorphic_entities = []
990 for poly in self.with_polymorphic_mappers:
991 if poly is not mapper:
992 ent = AliasedClass(
993 poly.class_,
994 selectable,
995 base_alias=self,
996 adapt_on_names=adapt_on_names,
997 use_mapper_path=_use_mapper_path,
998 )
999
1000 setattr(self.entity, poly.class_.__name__, ent)
1001 self._with_polymorphic_entities.append(ent._aliased_insp)
1002
1003 else:
1004 self._is_with_polymorphic = False
1005 self.with_polymorphic_mappers = [mapper]
1006
1007 self._adapter = ORMAdapter(
1008 _TraceAdaptRole.ALIASED_INSP,
1009 mapper,
1010 selectable=selectable,
1011 equivalents=mapper._equivalent_columns,
1012 adapt_on_names=adapt_on_names,
1013 anonymize_labels=True,
1014 # make sure the adapter doesn't try to grab other tables that
1015 # are not even the thing we are mapping, such as embedded
1016 # selectables in subqueries or CTEs. See issue #6060
1017 adapt_from_selectables={
1018 m.selectable
1019 for m in self.with_polymorphic_mappers
1020 if not adapt_on_names
1021 },
1022 limit_on_entity=False,
1023 )
1024
1025 if nest_adapters:
1026 # supports "aliased class of aliased class" use case
1027 assert isinstance(inspected, AliasedInsp)
1028 self._adapter = inspected._adapter.wrap(self._adapter)
1029
1030 self._adapt_on_names = adapt_on_names
1031 self._target = mapped_class_or_ac
1032
1033 @classmethod
1034 def _alias_factory(
1035 cls,
1036 element: Union[_EntityType[_O], FromClause],
1037 alias: Optional[FromClause] = None,
1038 name: Optional[str] = None,
1039 flat: bool = False,
1040 adapt_on_names: bool = False,
1041 ) -> Union[AliasedClass[_O], FromClause]:
1042 if isinstance(element, FromClause):
1043 if adapt_on_names:
1044 raise sa_exc.ArgumentError(
1045 "adapt_on_names only applies to ORM elements"
1046 )
1047 if name:
1048 return element.alias(name=name, flat=flat)
1049 else:
1050 return coercions.expect(
1051 roles.AnonymizedFromClauseRole, element, flat=flat
1052 )
1053 else:
1054 return AliasedClass(
1055 element,
1056 alias=alias,
1057 flat=flat,
1058 name=name,
1059 adapt_on_names=adapt_on_names,
1060 )
1061
1062 @classmethod
1063 def _with_polymorphic_factory(
1064 cls,
1065 base: Union[Type[_O], Mapper[_O]],
1066 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1067 selectable: Union[Literal[False, None], FromClause] = False,
1068 flat: bool = False,
1069 polymorphic_on: Optional[ColumnElement[Any]] = None,
1070 aliased: bool = False,
1071 innerjoin: bool = False,
1072 adapt_on_names: bool = False,
1073 name: Optional[str] = None,
1074 _use_mapper_path: bool = False,
1075 ) -> AliasedClass[_O]:
1076 primary_mapper = _class_to_mapper(base)
1077
1078 if selectable not in (None, False) and flat:
1079 raise sa_exc.ArgumentError(
1080 "the 'flat' and 'selectable' arguments cannot be passed "
1081 "simultaneously to with_polymorphic()"
1082 )
1083
1084 mappers, selectable = primary_mapper._with_polymorphic_args(
1085 classes, selectable, innerjoin=innerjoin
1086 )
1087 if aliased or flat:
1088 assert selectable is not None
1089 selectable = selectable._anonymous_fromclause(flat=flat)
1090
1091 return AliasedClass(
1092 base,
1093 selectable,
1094 name=name,
1095 with_polymorphic_mappers=mappers,
1096 adapt_on_names=adapt_on_names,
1097 with_polymorphic_discriminator=polymorphic_on,
1098 use_mapper_path=_use_mapper_path,
1099 represents_outer_join=not innerjoin,
1100 )
1101
1102 @property
1103 def entity(self) -> AliasedClass[_O]:
1104 # to eliminate reference cycles, the AliasedClass is held weakly.
1105 # this produces some situations where the AliasedClass gets lost,
1106 # particularly when one is created internally and only the AliasedInsp
1107 # is passed around.
1108 # to work around this case, we just generate a new one when we need
1109 # it, as it is a simple class with very little initial state on it.
1110 ent = self._weak_entity()
1111 if ent is None:
1112 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1113 self._weak_entity = weakref.ref(ent)
1114 return ent
1115
1116 is_aliased_class = True
1117 "always returns True"
1118
1119 def _memoized_method___clause_element__(self) -> FromClause:
1120 return self.selectable._annotate(
1121 {
1122 "parentmapper": self.mapper,
1123 "parententity": self,
1124 "entity_namespace": self,
1125 }
1126 )._set_propagate_attrs(
1127 {"compile_state_plugin": "orm", "plugin_subject": self}
1128 )
1129
1130 @property
1131 def entity_namespace(self) -> AliasedClass[_O]:
1132 return self.entity
1133
1134 @property
1135 def class_(self) -> Type[_O]:
1136 """Return the mapped class ultimately represented by this
1137 :class:`.AliasedInsp`."""
1138 return self.mapper.class_
1139
1140 @property
1141 def _path_registry(self) -> AbstractEntityRegistry:
1142 if self._use_mapper_path:
1143 return self.mapper._path_registry
1144 else:
1145 return PathRegistry.per_mapper(self)
1146
1147 def __getstate__(self) -> Dict[str, Any]:
1148 return {
1149 "entity": self.entity,
1150 "mapper": self.mapper,
1151 "alias": self.selectable,
1152 "name": self.name,
1153 "adapt_on_names": self._adapt_on_names,
1154 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1155 "with_polymorphic_discriminator": self.polymorphic_on,
1156 "base_alias": self._base_alias(),
1157 "use_mapper_path": self._use_mapper_path,
1158 "represents_outer_join": self.represents_outer_join,
1159 "nest_adapters": self._nest_adapters,
1160 }
1161
1162 def __setstate__(self, state: Dict[str, Any]) -> None:
1163 self.__init__( # type: ignore
1164 state["entity"],
1165 state["mapper"],
1166 state["alias"],
1167 state["name"],
1168 state["with_polymorphic_mappers"],
1169 state["with_polymorphic_discriminator"],
1170 state["base_alias"],
1171 state["use_mapper_path"],
1172 state["adapt_on_names"],
1173 state["represents_outer_join"],
1174 state["nest_adapters"],
1175 )
1176
1177 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1178 # assert self._is_with_polymorphic
1179 # assert other._is_with_polymorphic
1180
1181 primary_mapper = other.mapper
1182
1183 assert self.mapper is primary_mapper
1184
1185 our_classes = util.to_set(
1186 mp.class_ for mp in self.with_polymorphic_mappers
1187 )
1188 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1189 if our_classes == new_classes:
1190 return other
1191 else:
1192 classes = our_classes.union(new_classes)
1193
1194 mappers, selectable = primary_mapper._with_polymorphic_args(
1195 classes, None, innerjoin=not other.represents_outer_join
1196 )
1197 selectable = selectable._anonymous_fromclause(flat=True)
1198 return AliasedClass(
1199 primary_mapper,
1200 selectable,
1201 with_polymorphic_mappers=mappers,
1202 with_polymorphic_discriminator=other.polymorphic_on,
1203 use_mapper_path=other._use_mapper_path,
1204 represents_outer_join=other.represents_outer_join,
1205 )._aliased_insp
1206
1207 def _adapt_element(
1208 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1209 ) -> _ORMCOLEXPR:
1210 assert isinstance(expr, ColumnElement)
1211 d: Dict[str, Any] = {
1212 "parententity": self,
1213 "parentmapper": self.mapper,
1214 }
1215 if key:
1216 d["proxy_key"] = key
1217
1218 # IMO mypy should see this one also as returning the same type
1219 # we put into it, but it's not
1220 return (
1221 self._adapter.traverse(expr)
1222 ._annotate(d)
1223 ._set_propagate_attrs(
1224 {"compile_state_plugin": "orm", "plugin_subject": self}
1225 )
1226 )
1227
1228 if TYPE_CHECKING:
1229 # establish compatibility with the _ORMAdapterProto protocol,
1230 # which in turn is compatible with _CoreAdapterProto.
1231
1232 def _orm_adapt_element(
1233 self,
1234 obj: _CE,
1235 key: Optional[str] = None,
1236 ) -> _CE: ...
1237
1238 else:
1239 _orm_adapt_element = _adapt_element
1240
1241 def _entity_for_mapper(self, mapper):
1242 self_poly = self.with_polymorphic_mappers
1243 if mapper in self_poly:
1244 if mapper is self.mapper:
1245 return self
1246 else:
1247 return getattr(
1248 self.entity, mapper.class_.__name__
1249 )._aliased_insp
1250 elif mapper.isa(self.mapper):
1251 return self
1252 else:
1253 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1254
1255 def _memoized_attr__get_clause(self):
1256 onclause, replacemap = self.mapper._get_clause
1257 return (
1258 self._adapter.traverse(onclause),
1259 {
1260 self._adapter.traverse(col): param
1261 for col, param in replacemap.items()
1262 },
1263 )
1264
1265 def _memoized_attr__memoized_values(self):
1266 return {}
1267
1268 def _memoized_attr__all_column_expressions(self):
1269 if self._is_with_polymorphic:
1270 cols_plus_keys = self.mapper._columns_plus_keys(
1271 [ent.mapper for ent in self._with_polymorphic_entities]
1272 )
1273 else:
1274 cols_plus_keys = self.mapper._columns_plus_keys()
1275
1276 cols_plus_keys = [
1277 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1278 ]
1279
1280 return ColumnCollection(cols_plus_keys)
1281
1282 def _memo(self, key, callable_, *args, **kw):
1283 if key in self._memoized_values:
1284 return self._memoized_values[key]
1285 else:
1286 self._memoized_values[key] = value = callable_(*args, **kw)
1287 return value
1288
1289 def __repr__(self):
1290 if self.with_polymorphic_mappers:
1291 with_poly = "(%s)" % ", ".join(
1292 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1293 )
1294 else:
1295 with_poly = ""
1296 return "<AliasedInsp at 0x%x; %s%s>" % (
1297 id(self),
1298 self.class_.__name__,
1299 with_poly,
1300 )
1301
1302 def __str__(self):
1303 if self._is_with_polymorphic:
1304 return "with_polymorphic(%s, [%s])" % (
1305 self._target.__name__,
1306 ", ".join(
1307 mp.class_.__name__
1308 for mp in self.with_polymorphic_mappers
1309 if mp is not self.mapper
1310 ),
1311 )
1312 else:
1313 return "aliased(%s)" % (self._target.__name__,)
1314
1315
1316class _WrapUserEntity:
1317 """A wrapper used within the loader_criteria lambda caller so that
1318 we can bypass declared_attr descriptors on unmapped mixins, which
1319 normally emit a warning for such use.
1320
1321 might also be useful for other per-lambda instrumentations should
1322 the need arise.
1323
1324 """
1325
1326 __slots__ = ("subject",)
1327
1328 def __init__(self, subject):
1329 self.subject = subject
1330
1331 @util.preload_module("sqlalchemy.orm.decl_api")
1332 def __getattribute__(self, name):
1333 decl_api = util.preloaded.orm.decl_api
1334
1335 subject = object.__getattribute__(self, "subject")
1336 if name in subject.__dict__ and isinstance(
1337 subject.__dict__[name], decl_api.declared_attr
1338 ):
1339 return subject.__dict__[name].fget(subject)
1340 else:
1341 return getattr(subject, name)
1342
1343
1344class LoaderCriteriaOption(CriteriaOption):
1345 """Add additional WHERE criteria to the load for all occurrences of
1346 a particular entity.
1347
1348 :class:`_orm.LoaderCriteriaOption` is invoked using the
1349 :func:`_orm.with_loader_criteria` function; see that function for
1350 details.
1351
1352 .. versionadded:: 1.4
1353
1354 """
1355
1356 __slots__ = (
1357 "root_entity",
1358 "entity",
1359 "deferred_where_criteria",
1360 "where_criteria",
1361 "_where_crit_orig",
1362 "include_aliases",
1363 "propagate_to_loaders",
1364 )
1365
1366 _traverse_internals = [
1367 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1368 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1369 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1370 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1371 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1372 ]
1373
1374 root_entity: Optional[Type[Any]]
1375 entity: Optional[_InternalEntityType[Any]]
1376 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1377 deferred_where_criteria: bool
1378 include_aliases: bool
1379 propagate_to_loaders: bool
1380
1381 _where_crit_orig: Any
1382
1383 def __init__(
1384 self,
1385 entity_or_base: _EntityType[Any],
1386 where_criteria: Union[
1387 _ColumnExpressionArgument[bool],
1388 Callable[[Any], _ColumnExpressionArgument[bool]],
1389 ],
1390 loader_only: bool = False,
1391 include_aliases: bool = False,
1392 propagate_to_loaders: bool = True,
1393 track_closure_variables: bool = True,
1394 ):
1395 entity = cast(
1396 "_InternalEntityType[Any]",
1397 inspection.inspect(entity_or_base, False),
1398 )
1399 if entity is None:
1400 self.root_entity = cast("Type[Any]", entity_or_base)
1401 self.entity = None
1402 else:
1403 self.root_entity = None
1404 self.entity = entity
1405
1406 self._where_crit_orig = where_criteria
1407 if callable(where_criteria):
1408 if self.root_entity is not None:
1409 wrap_entity = self.root_entity
1410 else:
1411 assert entity is not None
1412 wrap_entity = entity.entity
1413
1414 self.deferred_where_criteria = True
1415 self.where_criteria = lambdas.DeferredLambdaElement(
1416 where_criteria,
1417 roles.WhereHavingRole,
1418 lambda_args=(_WrapUserEntity(wrap_entity),),
1419 opts=lambdas.LambdaOptions(
1420 track_closure_variables=track_closure_variables
1421 ),
1422 )
1423 else:
1424 self.deferred_where_criteria = False
1425 self.where_criteria = coercions.expect(
1426 roles.WhereHavingRole, where_criteria
1427 )
1428
1429 self.include_aliases = include_aliases
1430 self.propagate_to_loaders = propagate_to_loaders
1431
1432 @classmethod
1433 def _unreduce(
1434 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1435 ):
1436 return LoaderCriteriaOption(
1437 entity,
1438 where_criteria,
1439 include_aliases=include_aliases,
1440 propagate_to_loaders=propagate_to_loaders,
1441 )
1442
1443 def __reduce__(self):
1444 return (
1445 LoaderCriteriaOption._unreduce,
1446 (
1447 self.entity.class_ if self.entity else self.root_entity,
1448 self._where_crit_orig,
1449 self.include_aliases,
1450 self.propagate_to_loaders,
1451 ),
1452 )
1453
1454 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1455 if self.entity:
1456 yield from self.entity.mapper.self_and_descendants
1457 else:
1458 assert self.root_entity
1459 stack = list(self.root_entity.__subclasses__())
1460 while stack:
1461 subclass = stack.pop(0)
1462 ent = cast(
1463 "_InternalEntityType[Any]",
1464 inspection.inspect(subclass, raiseerr=False),
1465 )
1466 if ent:
1467 yield from ent.mapper.self_and_descendants
1468 else:
1469 stack.extend(subclass.__subclasses__())
1470
1471 def _should_include(self, compile_state: ORMCompileState) -> bool:
1472 if (
1473 compile_state.select_statement._annotations.get(
1474 "for_loader_criteria", None
1475 )
1476 is self
1477 ):
1478 return False
1479 return True
1480
1481 def _resolve_where_criteria(
1482 self, ext_info: _InternalEntityType[Any]
1483 ) -> ColumnElement[bool]:
1484 if self.deferred_where_criteria:
1485 crit = cast(
1486 "ColumnElement[bool]",
1487 self.where_criteria._resolve_with_args(ext_info.entity),
1488 )
1489 else:
1490 crit = self.where_criteria # type: ignore
1491 assert isinstance(crit, ColumnElement)
1492 return sql_util._deep_annotate(
1493 crit,
1494 {"for_loader_criteria": self},
1495 detect_subquery_cols=True,
1496 ind_cols_on_fromclause=True,
1497 )
1498
1499 def process_compile_state_replaced_entities(
1500 self,
1501 compile_state: ORMCompileState,
1502 mapper_entities: Iterable[_MapperEntity],
1503 ) -> None:
1504 self.process_compile_state(compile_state)
1505
1506 def process_compile_state(self, compile_state: ORMCompileState) -> None:
1507 """Apply a modification to a given :class:`.CompileState`."""
1508
1509 # if options to limit the criteria to immediate query only,
1510 # use compile_state.attributes instead
1511
1512 self.get_global_criteria(compile_state.global_attributes)
1513
1514 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1515 for mp in self._all_mappers():
1516 load_criteria = attributes.setdefault(
1517 ("additional_entity_criteria", mp), []
1518 )
1519
1520 load_criteria.append(self)
1521
1522
1523inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1524
1525
1526@inspection._inspects(type)
1527def _inspect_mc(
1528 class_: Type[_O],
1529) -> Optional[Mapper[_O]]:
1530 try:
1531 class_manager = opt_manager_of_class(class_)
1532 if class_manager is None or not class_manager.is_mapped:
1533 return None
1534 mapper = class_manager.mapper
1535 except exc.NO_STATE:
1536 return None
1537 else:
1538 return mapper
1539
1540
1541GenericAlias = type(List[Any])
1542
1543
1544@inspection._inspects(GenericAlias)
1545def _inspect_generic_alias(
1546 class_: Type[_O],
1547) -> Optional[Mapper[_O]]:
1548 origin = cast("Type[_O]", typing_get_origin(class_))
1549 return _inspect_mc(origin)
1550
1551
1552@inspection._self_inspects
1553class Bundle(
1554 ORMColumnsClauseRole[_T],
1555 SupportsCloneAnnotations,
1556 MemoizedHasCacheKey,
1557 inspection.Inspectable["Bundle[_T]"],
1558 InspectionAttr,
1559):
1560 """A grouping of SQL expressions that are returned by a :class:`.Query`
1561 under one namespace.
1562
1563 The :class:`.Bundle` essentially allows nesting of the tuple-based
1564 results returned by a column-oriented :class:`_query.Query` object.
1565 It also
1566 is extensible via simple subclassing, where the primary capability
1567 to override is that of how the set of expressions should be returned,
1568 allowing post-processing as well as custom return types, without
1569 involving ORM identity-mapped classes.
1570
1571 .. seealso::
1572
1573 :ref:`bundles`
1574
1575
1576 """
1577
1578 single_entity = False
1579 """If True, queries for a single Bundle will be returned as a single
1580 entity, rather than an element within a keyed tuple."""
1581
1582 is_clause_element = False
1583
1584 is_mapper = False
1585
1586 is_aliased_class = False
1587
1588 is_bundle = True
1589
1590 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1591
1592 proxy_set = util.EMPTY_SET # type: ignore
1593
1594 exprs: List[_ColumnsClauseElement]
1595
1596 def __init__(
1597 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1598 ):
1599 r"""Construct a new :class:`.Bundle`.
1600
1601 e.g.::
1602
1603 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1604
1605 for row in session.query(bn).filter(
1606 bn.c.x == 5).filter(bn.c.y == 4):
1607 print(row.mybundle.x, row.mybundle.y)
1608
1609 :param name: name of the bundle.
1610 :param \*exprs: columns or SQL expressions comprising the bundle.
1611 :param single_entity=False: if True, rows for this :class:`.Bundle`
1612 can be returned as a "single entity" outside of any enclosing tuple
1613 in the same manner as a mapped entity.
1614
1615 """
1616 self.name = self._label = name
1617 coerced_exprs = [
1618 coercions.expect(
1619 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1620 )
1621 for expr in exprs
1622 ]
1623 self.exprs = coerced_exprs
1624
1625 self.c = self.columns = ColumnCollection(
1626 (getattr(col, "key", col._label), col)
1627 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1628 ).as_readonly()
1629 self.single_entity = kw.pop("single_entity", self.single_entity)
1630
1631 def _gen_cache_key(
1632 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1633 ) -> Tuple[Any, ...]:
1634 return (self.__class__, self.name, self.single_entity) + tuple(
1635 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1636 )
1637
1638 @property
1639 def mapper(self) -> Optional[Mapper[Any]]:
1640 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1641 "parentmapper", None
1642 )
1643 return mp
1644
1645 @property
1646 def entity(self) -> Optional[_InternalEntityType[Any]]:
1647 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1648 0
1649 ]._annotations.get("parententity", None)
1650 return ie
1651
1652 @property
1653 def entity_namespace(
1654 self,
1655 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1656 return self.c
1657
1658 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1659
1660 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1661
1662 e.g.::
1663
1664 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1665
1666 q = sess.query(bn).filter(bn.c.x == 5)
1667
1668 Nesting of bundles is also supported::
1669
1670 b1 = Bundle("b1",
1671 Bundle('b2', MyClass.a, MyClass.b),
1672 Bundle('b3', MyClass.x, MyClass.y)
1673 )
1674
1675 q = sess.query(b1).filter(
1676 b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1677
1678 .. seealso::
1679
1680 :attr:`.Bundle.c`
1681
1682 """
1683
1684 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1685 """An alias for :attr:`.Bundle.columns`."""
1686
1687 def _clone(self, **kw):
1688 cloned = self.__class__.__new__(self.__class__)
1689 cloned.__dict__.update(self.__dict__)
1690 return cloned
1691
1692 def __clause_element__(self):
1693 # ensure existing entity_namespace remains
1694 annotations = {"bundle": self, "entity_namespace": self}
1695 annotations.update(self._annotations)
1696
1697 plugin_subject = self.exprs[0]._propagate_attrs.get(
1698 "plugin_subject", self.entity
1699 )
1700 return (
1701 expression.ClauseList(
1702 _literal_as_text_role=roles.ColumnsClauseRole,
1703 group=False,
1704 *[e._annotations.get("bundle", e) for e in self.exprs],
1705 )
1706 ._annotate(annotations)
1707 ._set_propagate_attrs(
1708 # the Bundle *must* use the orm plugin no matter what. the
1709 # subject can be None but it's much better if it's not.
1710 {
1711 "compile_state_plugin": "orm",
1712 "plugin_subject": plugin_subject,
1713 }
1714 )
1715 )
1716
1717 @property
1718 def clauses(self):
1719 return self.__clause_element__().clauses
1720
1721 def label(self, name):
1722 """Provide a copy of this :class:`.Bundle` passing a new label."""
1723
1724 cloned = self._clone()
1725 cloned.name = name
1726 return cloned
1727
1728 def create_row_processor(
1729 self,
1730 query: Select[Unpack[TupleAny]],
1731 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1732 labels: Sequence[str],
1733 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1734 """Produce the "row processing" function for this :class:`.Bundle`.
1735
1736 May be overridden by subclasses to provide custom behaviors when
1737 results are fetched. The method is passed the statement object and a
1738 set of "row processor" functions at query execution time; these
1739 processor functions when given a result row will return the individual
1740 attribute value, which can then be adapted into any kind of return data
1741 structure.
1742
1743 The example below illustrates replacing the usual :class:`.Row`
1744 return structure with a straight Python dictionary::
1745
1746 from sqlalchemy.orm import Bundle
1747
1748 class DictBundle(Bundle):
1749 def create_row_processor(self, query, procs, labels):
1750 'Override create_row_processor to return values as
1751 dictionaries'
1752
1753 def proc(row):
1754 return dict(
1755 zip(labels, (proc(row) for proc in procs))
1756 )
1757 return proc
1758
1759 A result from the above :class:`_orm.Bundle` will return dictionary
1760 values::
1761
1762 bn = DictBundle('mybundle', MyClass.data1, MyClass.data2)
1763 for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'):
1764 print(row.mybundle['data1'], row.mybundle['data2'])
1765
1766 """
1767 keyed_tuple = result_tuple(labels, [() for l in labels])
1768
1769 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1770 return keyed_tuple([proc(row) for proc in procs])
1771
1772 return proc
1773
1774
1775def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1776 """Deep copy the given ClauseElement, annotating each element with the
1777 "_orm_adapt" flag.
1778
1779 Elements within the exclude collection will be cloned but not annotated.
1780
1781 """
1782 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1783
1784
1785def _orm_deannotate(element: _SA) -> _SA:
1786 """Remove annotations that link a column to a particular mapping.
1787
1788 Note this doesn't affect "remote" and "foreign" annotations
1789 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1790 annotators.
1791
1792 """
1793
1794 return sql_util._deep_deannotate(
1795 element, values=("_orm_adapt", "parententity")
1796 )
1797
1798
1799def _orm_full_deannotate(element: _SA) -> _SA:
1800 return sql_util._deep_deannotate(element)
1801
1802
1803class _ORMJoin(expression.Join):
1804 """Extend Join to support ORM constructs as input."""
1805
1806 __visit_name__ = expression.Join.__visit_name__
1807
1808 inherit_cache = True
1809
1810 def __init__(
1811 self,
1812 left: _FromClauseArgument,
1813 right: _FromClauseArgument,
1814 onclause: Optional[_OnClauseArgument] = None,
1815 isouter: bool = False,
1816 full: bool = False,
1817 _left_memo: Optional[Any] = None,
1818 _right_memo: Optional[Any] = None,
1819 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1820 ):
1821 left_info = cast(
1822 "Union[FromClause, _InternalEntityType[Any]]",
1823 inspection.inspect(left),
1824 )
1825
1826 right_info = cast(
1827 "Union[FromClause, _InternalEntityType[Any]]",
1828 inspection.inspect(right),
1829 )
1830 adapt_to = right_info.selectable
1831
1832 # used by joined eager loader
1833 self._left_memo = _left_memo
1834 self._right_memo = _right_memo
1835
1836 if isinstance(onclause, attributes.QueryableAttribute):
1837 if TYPE_CHECKING:
1838 assert isinstance(
1839 onclause.comparator, RelationshipProperty.Comparator
1840 )
1841 on_selectable = onclause.comparator._source_selectable()
1842 prop = onclause.property
1843 _extra_criteria += onclause._extra_criteria
1844 elif isinstance(onclause, MapperProperty):
1845 # used internally by joined eager loader...possibly not ideal
1846 prop = onclause
1847 on_selectable = prop.parent.selectable
1848 else:
1849 prop = None
1850 on_selectable = None
1851
1852 left_selectable = left_info.selectable
1853 if prop:
1854 adapt_from: Optional[FromClause]
1855 if sql_util.clause_is_present(on_selectable, left_selectable):
1856 adapt_from = on_selectable
1857 else:
1858 assert isinstance(left_selectable, FromClause)
1859 adapt_from = left_selectable
1860
1861 (
1862 pj,
1863 sj,
1864 source,
1865 dest,
1866 secondary,
1867 target_adapter,
1868 ) = prop._create_joins(
1869 source_selectable=adapt_from,
1870 dest_selectable=adapt_to,
1871 source_polymorphic=True,
1872 of_type_entity=right_info,
1873 alias_secondary=True,
1874 extra_criteria=_extra_criteria,
1875 )
1876
1877 if sj is not None:
1878 if isouter:
1879 # note this is an inner join from secondary->right
1880 right = sql.join(secondary, right, sj)
1881 onclause = pj
1882 else:
1883 left = sql.join(left, secondary, pj, isouter)
1884 onclause = sj
1885 else:
1886 onclause = pj
1887
1888 self._target_adapter = target_adapter
1889
1890 # we don't use the normal coercions logic for _ORMJoin
1891 # (probably should), so do some gymnastics to get the entity.
1892 # logic here is for #8721, which was a major bug in 1.4
1893 # for almost two years, not reported/fixed until 1.4.43 (!)
1894 if is_selectable(left_info):
1895 parententity = left_selectable._annotations.get(
1896 "parententity", None
1897 )
1898 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1899 parententity = left_info
1900 else:
1901 parententity = None
1902
1903 if parententity is not None:
1904 self._annotations = self._annotations.union(
1905 {"parententity": parententity}
1906 )
1907
1908 augment_onclause = bool(_extra_criteria) and not prop
1909 expression.Join.__init__(self, left, right, onclause, isouter, full)
1910
1911 assert self.onclause is not None
1912
1913 if augment_onclause:
1914 self.onclause &= sql.and_(*_extra_criteria)
1915
1916 if (
1917 not prop
1918 and getattr(right_info, "mapper", None)
1919 and right_info.mapper.single # type: ignore
1920 ):
1921 right_info = cast("_InternalEntityType[Any]", right_info)
1922 # if single inheritance target and we are using a manual
1923 # or implicit ON clause, augment it the same way we'd augment the
1924 # WHERE.
1925 single_crit = right_info.mapper._single_table_criterion
1926 if single_crit is not None:
1927 if insp_is_aliased_class(right_info):
1928 single_crit = right_info._adapter.traverse(single_crit)
1929 self.onclause = self.onclause & single_crit
1930
1931 def _splice_into_center(self, other):
1932 """Splice a join into the center.
1933
1934 Given join(a, b) and join(b, c), return join(a, b).join(c)
1935
1936 """
1937 leftmost = other
1938 while isinstance(leftmost, sql.Join):
1939 leftmost = leftmost.left
1940
1941 assert self.right is leftmost
1942
1943 left = _ORMJoin(
1944 self.left,
1945 other.left,
1946 self.onclause,
1947 isouter=self.isouter,
1948 _left_memo=self._left_memo,
1949 _right_memo=None,
1950 )
1951
1952 return _ORMJoin(
1953 left,
1954 other.right,
1955 other.onclause,
1956 isouter=other.isouter,
1957 _right_memo=other._right_memo,
1958 )
1959
1960 def join(
1961 self,
1962 right: _FromClauseArgument,
1963 onclause: Optional[_OnClauseArgument] = None,
1964 isouter: bool = False,
1965 full: bool = False,
1966 ) -> _ORMJoin:
1967 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1968
1969 def outerjoin(
1970 self,
1971 right: _FromClauseArgument,
1972 onclause: Optional[_OnClauseArgument] = None,
1973 full: bool = False,
1974 ) -> _ORMJoin:
1975 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1976
1977
1978def with_parent(
1979 instance: object,
1980 prop: attributes.QueryableAttribute[Any],
1981 from_entity: Optional[_EntityType[Any]] = None,
1982) -> ColumnElement[bool]:
1983 """Create filtering criterion that relates this query's primary entity
1984 to the given related instance, using established
1985 :func:`_orm.relationship()`
1986 configuration.
1987
1988 E.g.::
1989
1990 stmt = select(Address).where(with_parent(some_user, User.addresses))
1991
1992
1993 The SQL rendered is the same as that rendered when a lazy loader
1994 would fire off from the given parent on that attribute, meaning
1995 that the appropriate state is taken from the parent object in
1996 Python without the need to render joins to the parent table
1997 in the rendered statement.
1998
1999 The given property may also make use of :meth:`_orm.PropComparator.of_type`
2000 to indicate the left side of the criteria::
2001
2002
2003 a1 = aliased(Address)
2004 a2 = aliased(Address)
2005 stmt = select(a1, a2).where(
2006 with_parent(u1, User.addresses.of_type(a2))
2007 )
2008
2009 The above use is equivalent to using the
2010 :func:`_orm.with_parent.from_entity` argument::
2011
2012 a1 = aliased(Address)
2013 a2 = aliased(Address)
2014 stmt = select(a1, a2).where(
2015 with_parent(u1, User.addresses, from_entity=a2)
2016 )
2017
2018 :param instance:
2019 An instance which has some :func:`_orm.relationship`.
2020
2021 :param property:
2022 Class-bound attribute, which indicates
2023 what relationship from the instance should be used to reconcile the
2024 parent/child relationship.
2025
2026 :param from_entity:
2027 Entity in which to consider as the left side. This defaults to the
2028 "zero" entity of the :class:`_query.Query` itself.
2029
2030 .. versionadded:: 1.2
2031
2032 """
2033 prop_t: RelationshipProperty[Any]
2034
2035 if isinstance(prop, str):
2036 raise sa_exc.ArgumentError(
2037 "with_parent() accepts class-bound mapped attributes, not strings"
2038 )
2039 elif isinstance(prop, attributes.QueryableAttribute):
2040 if prop._of_type:
2041 from_entity = prop._of_type
2042 mapper_property = prop.property
2043 if mapper_property is None or not prop_is_relationship(
2044 mapper_property
2045 ):
2046 raise sa_exc.ArgumentError(
2047 f"Expected relationship property for with_parent(), "
2048 f"got {mapper_property}"
2049 )
2050 prop_t = mapper_property
2051 else:
2052 prop_t = prop
2053
2054 return prop_t._with_parent(instance, from_entity=from_entity)
2055
2056
2057def has_identity(object_: object) -> bool:
2058 """Return True if the given object has a database
2059 identity.
2060
2061 This typically corresponds to the object being
2062 in either the persistent or detached state.
2063
2064 .. seealso::
2065
2066 :func:`.was_deleted`
2067
2068 """
2069 state = attributes.instance_state(object_)
2070 return state.has_identity
2071
2072
2073def was_deleted(object_: object) -> bool:
2074 """Return True if the given object was deleted
2075 within a session flush.
2076
2077 This is regardless of whether or not the object is
2078 persistent or detached.
2079
2080 .. seealso::
2081
2082 :attr:`.InstanceState.was_deleted`
2083
2084 """
2085
2086 state = attributes.instance_state(object_)
2087 return state.was_deleted
2088
2089
2090def _entity_corresponds_to(
2091 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2092) -> bool:
2093 """determine if 'given' corresponds to 'entity', in terms
2094 of an entity passed to Query that would match the same entity
2095 being referred to elsewhere in the query.
2096
2097 """
2098 if insp_is_aliased_class(entity):
2099 if insp_is_aliased_class(given):
2100 if entity._base_alias() is given._base_alias():
2101 return True
2102 return False
2103 elif insp_is_aliased_class(given):
2104 if given._use_mapper_path:
2105 return entity in given.with_polymorphic_mappers
2106 else:
2107 return entity is given
2108
2109 assert insp_is_mapper(given)
2110 return entity.common_parent(given)
2111
2112
2113def _entity_corresponds_to_use_path_impl(
2114 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2115) -> bool:
2116 """determine if 'given' corresponds to 'entity', in terms
2117 of a path of loader options where a mapped attribute is taken to
2118 be a member of a parent entity.
2119
2120 e.g.::
2121
2122 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2123 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2124
2125 a1 = aliased(A)
2126 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2127 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2128
2129 wp = with_polymorphic(A, [A1, A2])
2130 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2131 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2132
2133
2134 """
2135 if insp_is_aliased_class(given):
2136 return (
2137 insp_is_aliased_class(entity)
2138 and not entity._use_mapper_path
2139 and (given is entity or entity in given._with_polymorphic_entities)
2140 )
2141 elif not insp_is_aliased_class(entity):
2142 return given.isa(entity.mapper)
2143 else:
2144 return (
2145 entity._use_mapper_path
2146 and given in entity.with_polymorphic_mappers
2147 )
2148
2149
2150def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2151 """determine if 'given' "is a" mapper, in terms of the given
2152 would load rows of type 'mapper'.
2153
2154 """
2155 if given.is_aliased_class:
2156 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2157 mapper
2158 )
2159 elif given.with_polymorphic_mappers:
2160 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2161 else:
2162 return given.isa(mapper)
2163
2164
2165def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2166 """calculate __getitem__ in terms of an iterable query object
2167 that also has a slice() method.
2168
2169 """
2170
2171 def _no_negative_indexes():
2172 raise IndexError(
2173 "negative indexes are not accepted by SQL "
2174 "index / slice operators"
2175 )
2176
2177 if isinstance(item, slice):
2178 start, stop, step = util.decode_slice(item)
2179
2180 if (
2181 isinstance(stop, int)
2182 and isinstance(start, int)
2183 and stop - start <= 0
2184 ):
2185 return []
2186
2187 elif (isinstance(start, int) and start < 0) or (
2188 isinstance(stop, int) and stop < 0
2189 ):
2190 _no_negative_indexes()
2191
2192 res = iterable_query.slice(start, stop)
2193 if step is not None:
2194 return list(res)[None : None : item.step]
2195 else:
2196 return list(res)
2197 else:
2198 if item == -1:
2199 _no_negative_indexes()
2200 else:
2201 return list(iterable_query[item : item + 1])[0]
2202
2203
2204def _is_mapped_annotation(
2205 raw_annotation: _AnnotationScanType,
2206 cls: Type[Any],
2207 originating_cls: Type[Any],
2208) -> bool:
2209 try:
2210 annotated = de_stringify_annotation(
2211 cls, raw_annotation, originating_cls.__module__
2212 )
2213 except NameError:
2214 # in most cases, at least within our own tests, we can raise
2215 # here, which is more accurate as it prevents us from returning
2216 # false negatives. However, in the real world, try to avoid getting
2217 # involved with end-user annotations that have nothing to do with us.
2218 # see issue #8888 where we bypass using this function in the case
2219 # that we want to detect an unresolvable Mapped[] type.
2220 return False
2221 else:
2222 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2223
2224
2225class _CleanupError(Exception):
2226 pass
2227
2228
2229def _cleanup_mapped_str_annotation(
2230 annotation: str, originating_module: str
2231) -> str:
2232 # fix up an annotation that comes in as the form:
2233 # 'Mapped[List[Address]]' so that it instead looks like:
2234 # 'Mapped[List["Address"]]' , which will allow us to get
2235 # "Address" as a string
2236
2237 # additionally, resolve symbols for these names since this is where
2238 # we'd have to do it
2239
2240 inner: Optional[Match[str]]
2241
2242 mm = re.match(r"^(.+?)\[(.+)\]$", annotation)
2243
2244 if not mm:
2245 return annotation
2246
2247 # ticket #8759. Resolve the Mapped name to a real symbol.
2248 # originally this just checked the name.
2249 try:
2250 obj = eval_name_only(mm.group(1), originating_module)
2251 except NameError as ne:
2252 raise _CleanupError(
2253 f'For annotation "{annotation}", could not resolve '
2254 f'container type "{mm.group(1)}". '
2255 "Please ensure this type is imported at the module level "
2256 "outside of TYPE_CHECKING blocks"
2257 ) from ne
2258
2259 if obj is typing.ClassVar:
2260 real_symbol = "ClassVar"
2261 else:
2262 try:
2263 if issubclass(obj, _MappedAnnotationBase):
2264 real_symbol = obj.__name__
2265 else:
2266 return annotation
2267 except TypeError:
2268 # avoid isinstance(obj, type) check, just catch TypeError
2269 return annotation
2270
2271 # note: if one of the codepaths above didn't define real_symbol and
2272 # then didn't return, real_symbol raises UnboundLocalError
2273 # which is actually a NameError, and the calling routines don't
2274 # notice this since they are catching NameError anyway. Just in case
2275 # this is being modified in the future, something to be aware of.
2276
2277 stack = []
2278 inner = mm
2279 while True:
2280 stack.append(real_symbol if mm is inner else inner.group(1))
2281 g2 = inner.group(2)
2282 inner = re.match(r"^(.+?)\[(.+)\]$", g2)
2283 if inner is None:
2284 stack.append(g2)
2285 break
2286
2287 # stacks we want to rewrite, that is, quote the last entry which
2288 # we think is a relationship class name:
2289 #
2290 # ['Mapped', 'List', 'Address']
2291 # ['Mapped', 'A']
2292 #
2293 # stacks we dont want to rewrite, which are generally MappedColumn
2294 # use cases:
2295 #
2296 # ['Mapped', "'Optional[Dict[str, str]]'"]
2297 # ['Mapped', 'dict[str, str] | None']
2298
2299 if (
2300 # avoid already quoted symbols such as
2301 # ['Mapped', "'Optional[Dict[str, str]]'"]
2302 not re.match(r"""^["'].*["']$""", stack[-1])
2303 # avoid further generics like Dict[] such as
2304 # ['Mapped', 'dict[str, str] | None']
2305 and not re.match(r".*\[.*\]", stack[-1])
2306 ):
2307 stripchars = "\"' "
2308 stack[-1] = ", ".join(
2309 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2310 )
2311
2312 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2313
2314 return annotation
2315
2316
2317def _extract_mapped_subtype(
2318 raw_annotation: Optional[_AnnotationScanType],
2319 cls: type,
2320 originating_module: str,
2321 key: str,
2322 attr_cls: Type[Any],
2323 required: bool,
2324 is_dataclass_field: bool,
2325 expect_mapped: bool = True,
2326 raiseerr: bool = True,
2327) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
2328 """given an annotation, figure out if it's ``Mapped[something]`` and if
2329 so, return the ``something`` part.
2330
2331 Includes error raise scenarios and other options.
2332
2333 """
2334
2335 if raw_annotation is None:
2336 if required:
2337 raise sa_exc.ArgumentError(
2338 f"Python typing annotation is required for attribute "
2339 f'"{cls.__name__}.{key}" when primary argument(s) for '
2340 f'"{attr_cls.__name__}" construct are None or not present'
2341 )
2342 return None
2343
2344 try:
2345 annotated = de_stringify_annotation(
2346 cls,
2347 raw_annotation,
2348 originating_module,
2349 str_cleanup_fn=_cleanup_mapped_str_annotation,
2350 )
2351 except _CleanupError as ce:
2352 raise sa_exc.ArgumentError(
2353 f"Could not interpret annotation {raw_annotation}. "
2354 "Check that it uses names that are correctly imported at the "
2355 "module level. See chained stack trace for more hints."
2356 ) from ce
2357 except NameError as ne:
2358 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2359 raise sa_exc.ArgumentError(
2360 f"Could not interpret annotation {raw_annotation}. "
2361 "Check that it uses names that are correctly imported at the "
2362 "module level. See chained stack trace for more hints."
2363 ) from ne
2364
2365 annotated = raw_annotation # type: ignore
2366
2367 if is_dataclass_field:
2368 return annotated, None
2369 else:
2370 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2371 annotated, _MappedAnnotationBase
2372 ):
2373 if expect_mapped:
2374 if not raiseerr:
2375 return None
2376
2377 origin = getattr(annotated, "__origin__", None)
2378 if origin is typing.ClassVar:
2379 return None
2380
2381 # check for other kind of ORM descriptor like AssociationProxy,
2382 # don't raise for that (issue #9957)
2383 elif isinstance(origin, type) and issubclass(
2384 origin, ORMDescriptor
2385 ):
2386 return None
2387
2388 raise sa_exc.ArgumentError(
2389 f'Type annotation for "{cls.__name__}.{key}" '
2390 "can't be correctly interpreted for "
2391 "Annotated Declarative Table form. ORM annotations "
2392 "should normally make use of the ``Mapped[]`` generic "
2393 "type, or other ORM-compatible generic type, as a "
2394 "container for the actual type, which indicates the "
2395 "intent that the attribute is mapped. "
2396 "Class variables that are not intended to be mapped "
2397 "by the ORM should use ClassVar[]. "
2398 "To allow Annotated Declarative to disregard legacy "
2399 "annotations which don't use Mapped[] to pass, set "
2400 '"__allow_unmapped__ = True" on the class or a '
2401 "superclass this class.",
2402 code="zlpr",
2403 )
2404
2405 else:
2406 return annotated, None
2407
2408 if len(annotated.__args__) != 1:
2409 raise sa_exc.ArgumentError(
2410 "Expected sub-type for Mapped[] annotation"
2411 )
2412
2413 return (
2414 # fix dict/list/set args to be ForwardRef, see #11814
2415 fixup_container_fwd_refs(annotated.__args__[0]),
2416 annotated.__origin__,
2417 )
2418
2419
2420def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2421 if hasattr(prop, "_mapper_property_name"):
2422 name = prop._mapper_property_name()
2423 else:
2424 name = None
2425 return util.clsname_as_plain_name(prop, name)