1# orm/util.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import enum
12import functools
13import re
14import types
15import typing
16from typing import AbstractSet
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import FrozenSet
22from typing import Generic
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Match
27from typing import Optional
28from typing import Protocol
29from typing import Sequence
30from typing import Tuple
31from typing import Type
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35import weakref
36
37from . import attributes # noqa
38from . import exc
39from ._typing import _O
40from ._typing import insp_is_aliased_class
41from ._typing import insp_is_mapper
42from ._typing import prop_is_relationship
43from .base import _class_to_mapper as _class_to_mapper
44from .base import _MappedAnnotationBase
45from .base import _never_set as _never_set # noqa: F401
46from .base import _none_set as _none_set # noqa: F401
47from .base import attribute_str as attribute_str # noqa: F401
48from .base import class_mapper as class_mapper
49from .base import DynamicMapped
50from .base import InspectionAttr as InspectionAttr
51from .base import instance_str as instance_str # noqa: F401
52from .base import Mapped
53from .base import object_mapper as object_mapper
54from .base import object_state as object_state # noqa: F401
55from .base import opt_manager_of_class
56from .base import ORMDescriptor
57from .base import state_attribute_str as state_attribute_str # noqa: F401
58from .base import state_class_str as state_class_str # noqa: F401
59from .base import state_str as state_str # noqa: F401
60from .base import WriteOnlyMapped
61from .interfaces import CriteriaOption
62from .interfaces import MapperProperty as MapperProperty
63from .interfaces import ORMColumnsClauseRole
64from .interfaces import ORMEntityColumnsClauseRole
65from .interfaces import ORMFromClauseRole
66from .path_registry import PathRegistry as PathRegistry
67from .. import event
68from .. import exc as sa_exc
69from .. import inspection
70from .. import sql
71from .. import util
72from ..engine.result import result_tuple
73from ..sql import coercions
74from ..sql import expression
75from ..sql import lambdas
76from ..sql import roles
77from ..sql import util as sql_util
78from ..sql import visitors
79from ..sql._typing import is_selectable
80from ..sql.annotation import SupportsCloneAnnotations
81from ..sql.base import ColumnCollection
82from ..sql.cache_key import HasCacheKey
83from ..sql.cache_key import MemoizedHasCacheKey
84from ..sql.elements import ColumnElement
85from ..sql.elements import KeyedColumnElement
86from ..sql.selectable import FromClause
87from ..util.langhelpers import MemoizedSlots
88from ..util.typing import de_stringify_annotation as _de_stringify_annotation
89from ..util.typing import (
90 de_stringify_union_elements as _de_stringify_union_elements,
91)
92from ..util.typing import eval_name_only as _eval_name_only
93from ..util.typing import is_origin_of_cls
94from ..util.typing import Literal
95from ..util.typing import TupleAny
96from ..util.typing import typing_get_origin
97from ..util.typing import Unpack
98
99if typing.TYPE_CHECKING:
100 from ._typing import _EntityType
101 from ._typing import _IdentityKeyType
102 from ._typing import _InternalEntityType
103 from ._typing import _ORMCOLEXPR
104 from .context import _MapperEntity
105 from .context import ORMCompileState
106 from .mapper import Mapper
107 from .path_registry import AbstractEntityRegistry
108 from .query import Query
109 from .relationships import RelationshipProperty
110 from ..engine import Row
111 from ..engine import RowMapping
112 from ..sql._typing import _CE
113 from ..sql._typing import _ColumnExpressionArgument
114 from ..sql._typing import _EquivalentColumnMap
115 from ..sql._typing import _FromClauseArgument
116 from ..sql._typing import _OnClauseArgument
117 from ..sql._typing import _PropagateAttrsType
118 from ..sql.annotation import _SA
119 from ..sql.base import ReadOnlyColumnCollection
120 from ..sql.elements import BindParameter
121 from ..sql.selectable import _ColumnsClauseElement
122 from ..sql.selectable import Select
123 from ..sql.selectable import Selectable
124 from ..sql.visitors import anon_map
125 from ..util.typing import _AnnotationScanType
126 from ..util.typing import ArgsTypeProcotol
127
128_T = TypeVar("_T", bound=Any)
129
130all_cascades = frozenset(
131 (
132 "delete",
133 "delete-orphan",
134 "all",
135 "merge",
136 "expunge",
137 "save-update",
138 "refresh-expire",
139 "none",
140 )
141)
142
143
144_de_stringify_partial = functools.partial(
145 functools.partial,
146 locals_=util.immutabledict(
147 {
148 "Mapped": Mapped,
149 "WriteOnlyMapped": WriteOnlyMapped,
150 "DynamicMapped": DynamicMapped,
151 }
152 ),
153)
154
155# partial is practically useless as we have to write out the whole
156# function and maintain the signature anyway
157
158
159class _DeStringifyAnnotation(Protocol):
160 def __call__(
161 self,
162 cls: Type[Any],
163 annotation: _AnnotationScanType,
164 originating_module: str,
165 *,
166 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
167 include_generic: bool = False,
168 ) -> Type[Any]: ...
169
170
171de_stringify_annotation = cast(
172 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
173)
174
175
176class _DeStringifyUnionElements(Protocol):
177 def __call__(
178 self,
179 cls: Type[Any],
180 annotation: ArgsTypeProcotol,
181 originating_module: str,
182 *,
183 str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
184 ) -> Type[Any]: ...
185
186
187de_stringify_union_elements = cast(
188 _DeStringifyUnionElements,
189 _de_stringify_partial(_de_stringify_union_elements),
190)
191
192
193class _EvalNameOnly(Protocol):
194 def __call__(self, name: str, module_name: str) -> Any: ...
195
196
197eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
198
199
200class CascadeOptions(FrozenSet[str]):
201 """Keeps track of the options sent to
202 :paramref:`.relationship.cascade`"""
203
204 _add_w_all_cascades = all_cascades.difference(
205 ["all", "none", "delete-orphan"]
206 )
207 _allowed_cascades = all_cascades
208
209 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
210
211 __slots__ = (
212 "save_update",
213 "delete",
214 "refresh_expire",
215 "merge",
216 "expunge",
217 "delete_orphan",
218 )
219
220 save_update: bool
221 delete: bool
222 refresh_expire: bool
223 merge: bool
224 expunge: bool
225 delete_orphan: bool
226
227 def __new__(
228 cls, value_list: Optional[Union[Iterable[str], str]]
229 ) -> CascadeOptions:
230 if isinstance(value_list, str) or value_list is None:
231 return cls.from_string(value_list) # type: ignore
232 values = set(value_list)
233 if values.difference(cls._allowed_cascades):
234 raise sa_exc.ArgumentError(
235 "Invalid cascade option(s): %s"
236 % ", ".join(
237 [
238 repr(x)
239 for x in sorted(
240 values.difference(cls._allowed_cascades)
241 )
242 ]
243 )
244 )
245
246 if "all" in values:
247 values.update(cls._add_w_all_cascades)
248 if "none" in values:
249 values.clear()
250 values.discard("all")
251
252 self = super().__new__(cls, values)
253 self.save_update = "save-update" in values
254 self.delete = "delete" in values
255 self.refresh_expire = "refresh-expire" in values
256 self.merge = "merge" in values
257 self.expunge = "expunge" in values
258 self.delete_orphan = "delete-orphan" in values
259
260 if self.delete_orphan and not self.delete:
261 util.warn("The 'delete-orphan' cascade option requires 'delete'.")
262 return self
263
264 def __repr__(self):
265 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
266
267 @classmethod
268 def from_string(cls, arg):
269 values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
270 return cls(values)
271
272
273def _validator_events(desc, key, validator, include_removes, include_backrefs):
274 """Runs a validation method on an attribute value to be set or
275 appended.
276 """
277
278 if not include_backrefs:
279
280 def detect_is_backref(state, initiator):
281 impl = state.manager[key].impl
282 return initiator.impl is not impl
283
284 if include_removes:
285
286 def append(state, value, initiator):
287 if initiator.op is not attributes.OP_BULK_REPLACE and (
288 include_backrefs or not detect_is_backref(state, initiator)
289 ):
290 return validator(state.obj(), key, value, False)
291 else:
292 return value
293
294 def bulk_set(state, values, initiator):
295 if include_backrefs or not detect_is_backref(state, initiator):
296 obj = state.obj()
297 values[:] = [
298 validator(obj, key, value, False) for value in values
299 ]
300
301 def set_(state, value, oldvalue, initiator):
302 if include_backrefs or not detect_is_backref(state, initiator):
303 return validator(state.obj(), key, value, False)
304 else:
305 return value
306
307 def remove(state, value, initiator):
308 if include_backrefs or not detect_is_backref(state, initiator):
309 validator(state.obj(), key, value, True)
310
311 else:
312
313 def append(state, value, initiator):
314 if initiator.op is not attributes.OP_BULK_REPLACE and (
315 include_backrefs or not detect_is_backref(state, initiator)
316 ):
317 return validator(state.obj(), key, value)
318 else:
319 return value
320
321 def bulk_set(state, values, initiator):
322 if include_backrefs or not detect_is_backref(state, initiator):
323 obj = state.obj()
324 values[:] = [validator(obj, key, value) for value in values]
325
326 def set_(state, value, oldvalue, initiator):
327 if include_backrefs or not detect_is_backref(state, initiator):
328 return validator(state.obj(), key, value)
329 else:
330 return value
331
332 event.listen(desc, "append", append, raw=True, retval=True)
333 event.listen(desc, "bulk_replace", bulk_set, raw=True)
334 event.listen(desc, "set", set_, raw=True, retval=True)
335 if include_removes:
336 event.listen(desc, "remove", remove, raw=True, retval=True)
337
338
339def polymorphic_union(
340 table_map, typecolname, aliasname="p_union", cast_nulls=True
341):
342 """Create a ``UNION`` statement used by a polymorphic mapper.
343
344 See :ref:`concrete_inheritance` for an example of how
345 this is used.
346
347 :param table_map: mapping of polymorphic identities to
348 :class:`_schema.Table` objects.
349 :param typecolname: string name of a "discriminator" column, which will be
350 derived from the query, producing the polymorphic identity for
351 each row. If ``None``, no polymorphic discriminator is generated.
352 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
353 construct generated.
354 :param cast_nulls: if True, non-existent columns, which are represented
355 as labeled NULLs, will be passed into CAST. This is a legacy behavior
356 that is problematic on some backends such as Oracle - in which case it
357 can be set to False.
358
359 """
360
361 colnames: util.OrderedSet[str] = util.OrderedSet()
362 colnamemaps = {}
363 types = {}
364 for key in table_map:
365 table = table_map[key]
366
367 table = coercions.expect(
368 roles.StrictFromClauseRole, table, allow_select=True
369 )
370 table_map[key] = table
371
372 m = {}
373 for c in table.c:
374 if c.key == typecolname:
375 raise sa_exc.InvalidRequestError(
376 "Polymorphic union can't use '%s' as the discriminator "
377 "column due to mapped column %r; please apply the "
378 "'typecolname' "
379 "argument; this is available on "
380 "ConcreteBase as '_concrete_discriminator_name'"
381 % (typecolname, c)
382 )
383 colnames.add(c.key)
384 m[c.key] = c
385 types[c.key] = c.type
386 colnamemaps[table] = m
387
388 def col(name, table):
389 try:
390 return colnamemaps[table][name]
391 except KeyError:
392 if cast_nulls:
393 return sql.cast(sql.null(), types[name]).label(name)
394 else:
395 return sql.type_coerce(sql.null(), types[name]).label(name)
396
397 result = []
398 for type_, table in table_map.items():
399 if typecolname is not None:
400 result.append(
401 sql.select(
402 *(
403 [col(name, table) for name in colnames]
404 + [
405 sql.literal_column(
406 sql_util._quote_ddl_expr(type_)
407 ).label(typecolname)
408 ]
409 )
410 ).select_from(table)
411 )
412 else:
413 result.append(
414 sql.select(
415 *[col(name, table) for name in colnames]
416 ).select_from(table)
417 )
418 return sql.union_all(*result).alias(aliasname)
419
420
421def identity_key(
422 class_: Optional[Type[_T]] = None,
423 ident: Union[Any, Tuple[Any, ...]] = None,
424 *,
425 instance: Optional[_T] = None,
426 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
427 identity_token: Optional[Any] = None,
428) -> _IdentityKeyType[_T]:
429 r"""Generate "identity key" tuples, as are used as keys in the
430 :attr:`.Session.identity_map` dictionary.
431
432 This function has several call styles:
433
434 * ``identity_key(class, ident, identity_token=token)``
435
436 This form receives a mapped class and a primary key scalar or
437 tuple as an argument.
438
439 E.g.::
440
441 >>> identity_key(MyClass, (1, 2))
442 (<class '__main__.MyClass'>, (1, 2), None)
443
444 :param class: mapped class (must be a positional argument)
445 :param ident: primary key, may be a scalar or tuple argument.
446 :param identity_token: optional identity token
447
448 .. versionadded:: 1.2 added identity_token
449
450
451 * ``identity_key(instance=instance)``
452
453 This form will produce the identity key for a given instance. The
454 instance need not be persistent, only that its primary key attributes
455 are populated (else the key will contain ``None`` for those missing
456 values).
457
458 E.g.::
459
460 >>> instance = MyClass(1, 2)
461 >>> identity_key(instance=instance)
462 (<class '__main__.MyClass'>, (1, 2), None)
463
464 In this form, the given instance is ultimately run though
465 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
466 effect of performing a database check for the corresponding row
467 if the object is expired.
468
469 :param instance: object instance (must be given as a keyword arg)
470
471 * ``identity_key(class, row=row, identity_token=token)``
472
473 This form is similar to the class/tuple form, except is passed a
474 database result row as a :class:`.Row` or :class:`.RowMapping` object.
475
476 E.g.::
477
478 >>> row = engine.execute(\
479 text("select * from table where a=1 and b=2")\
480 ).first()
481 >>> identity_key(MyClass, row=row)
482 (<class '__main__.MyClass'>, (1, 2), None)
483
484 :param class: mapped class (must be a positional argument)
485 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
486 (must be given as a keyword arg)
487 :param identity_token: optional identity token
488
489 .. versionadded:: 1.2 added identity_token
490
491 """
492 if class_ is not None:
493 mapper = class_mapper(class_)
494 if row is None:
495 if ident is None:
496 raise sa_exc.ArgumentError("ident or row is required")
497 return mapper.identity_key_from_primary_key(
498 tuple(util.to_list(ident)), identity_token=identity_token
499 )
500 else:
501 return mapper.identity_key_from_row(
502 row, identity_token=identity_token
503 )
504 elif instance is not None:
505 mapper = object_mapper(instance)
506 return mapper.identity_key_from_instance(instance)
507 else:
508 raise sa_exc.ArgumentError("class or instance is required")
509
510
511class _TraceAdaptRole(enum.Enum):
512 """Enumeration of all the use cases for ORMAdapter.
513
514 ORMAdapter remains one of the most complicated aspects of the ORM, as it is
515 used for in-place adaption of column expressions to be applied to a SELECT,
516 replacing :class:`.Table` and other objects that are mapped to classes with
517 aliases of those tables in the case of joined eager loading, or in the case
518 of polymorphic loading as used with concrete mappings or other custom "with
519 polymorphic" parameters, with whole user-defined subqueries. The
520 enumerations provide an overview of all the use cases used by ORMAdapter, a
521 layer of formality as to the introduction of new ORMAdapter use cases (of
522 which none are anticipated), as well as a means to trace the origins of a
523 particular ORMAdapter within runtime debugging.
524
525 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
526 open-ended statement adaption, including the ``Query.with_polymorphic()``
527 method and the ``Query.select_from_entity()`` methods, favoring
528 user-explicit aliasing schemes using the ``aliased()`` and
529 ``with_polymorphic()`` standalone constructs; these still use adaption,
530 however the adaption is applied in a narrower scope.
531
532 """
533
534 # aliased() use that is used to adapt individual attributes at query
535 # construction time
536 ALIASED_INSP = enum.auto()
537
538 # joinedload cases; typically adapt an ON clause of a relationship
539 # join
540 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
541 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
542 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
543
544 # polymorphic cases - these are complex ones that replace FROM
545 # clauses, replacing tables with subqueries
546 MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
547 WITH_POLYMORPHIC_ADAPTER = enum.auto()
548 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
549 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
550
551 # the from_statement() case, used only to adapt individual attributes
552 # from a given statement to local ORM attributes at result fetching
553 # time. assigned to ORMCompileState._from_obj_alias
554 ADAPT_FROM_STATEMENT = enum.auto()
555
556 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
557 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
558 # joinedloads are then placed on the outside.
559 # assigned to ORMCompileState.compound_eager_adapter
560 COMPOUND_EAGER_STATEMENT = enum.auto()
561
562 # the legacy Query._set_select_from() case.
563 # this is needed for Query's set operations (i.e. UNION, etc. )
564 # as well as "legacy from_self()", which while removed from 2.0 as
565 # public API, is used for the Query.count() method. this one
566 # still does full statement traversal
567 # assigned to ORMCompileState._from_obj_alias
568 LEGACY_SELECT_FROM_ALIAS = enum.auto()
569
570
571class ORMStatementAdapter(sql_util.ColumnAdapter):
572 """ColumnAdapter which includes a role attribute."""
573
574 __slots__ = ("role",)
575
576 def __init__(
577 self,
578 role: _TraceAdaptRole,
579 selectable: Selectable,
580 *,
581 equivalents: Optional[_EquivalentColumnMap] = None,
582 adapt_required: bool = False,
583 allow_label_resolve: bool = True,
584 anonymize_labels: bool = False,
585 adapt_on_names: bool = False,
586 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
587 ):
588 self.role = role
589 super().__init__(
590 selectable,
591 equivalents=equivalents,
592 adapt_required=adapt_required,
593 allow_label_resolve=allow_label_resolve,
594 anonymize_labels=anonymize_labels,
595 adapt_on_names=adapt_on_names,
596 adapt_from_selectables=adapt_from_selectables,
597 )
598
599
600class ORMAdapter(sql_util.ColumnAdapter):
601 """ColumnAdapter subclass which excludes adaptation of entities from
602 non-matching mappers.
603
604 """
605
606 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
607
608 is_aliased_class: bool
609 aliased_insp: Optional[AliasedInsp[Any]]
610
611 def __init__(
612 self,
613 role: _TraceAdaptRole,
614 entity: _InternalEntityType[Any],
615 *,
616 equivalents: Optional[_EquivalentColumnMap] = None,
617 adapt_required: bool = False,
618 allow_label_resolve: bool = True,
619 anonymize_labels: bool = False,
620 selectable: Optional[Selectable] = None,
621 limit_on_entity: bool = True,
622 adapt_on_names: bool = False,
623 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
624 ):
625 self.role = role
626 self.mapper = entity.mapper
627 if selectable is None:
628 selectable = entity.selectable
629 if insp_is_aliased_class(entity):
630 self.is_aliased_class = True
631 self.aliased_insp = entity
632 else:
633 self.is_aliased_class = False
634 self.aliased_insp = None
635
636 super().__init__(
637 selectable,
638 equivalents,
639 adapt_required=adapt_required,
640 allow_label_resolve=allow_label_resolve,
641 anonymize_labels=anonymize_labels,
642 include_fn=self._include_fn if limit_on_entity else None,
643 adapt_on_names=adapt_on_names,
644 adapt_from_selectables=adapt_from_selectables,
645 )
646
647 def _include_fn(self, elem):
648 entity = elem._annotations.get("parentmapper", None)
649
650 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
651
652
653class AliasedClass(
654 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
655):
656 r"""Represents an "aliased" form of a mapped class for usage with Query.
657
658 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
659 construct, this object mimics the mapped class using a
660 ``__getattr__`` scheme and maintains a reference to a
661 real :class:`~sqlalchemy.sql.expression.Alias` object.
662
663 A primary purpose of :class:`.AliasedClass` is to serve as an alternate
664 within a SQL statement generated by the ORM, such that an existing
665 mapped entity can be used in multiple contexts. A simple example::
666
667 # find all pairs of users with the same name
668 user_alias = aliased(User)
669 session.query(User, user_alias).\
670 join((user_alias, User.id > user_alias.id)).\
671 filter(User.name == user_alias.name)
672
673 :class:`.AliasedClass` is also capable of mapping an existing mapped
674 class to an entirely new selectable, provided this selectable is column-
675 compatible with the existing mapped selectable, and it can also be
676 configured in a mapping as the target of a :func:`_orm.relationship`.
677 See the links below for examples.
678
679 The :class:`.AliasedClass` object is constructed typically using the
680 :func:`_orm.aliased` function. It also is produced with additional
681 configuration when using the :func:`_orm.with_polymorphic` function.
682
683 The resulting object is an instance of :class:`.AliasedClass`.
684 This object implements an attribute scheme which produces the
685 same attribute and method interface as the original mapped
686 class, allowing :class:`.AliasedClass` to be compatible
687 with any attribute technique which works on the original class,
688 including hybrid attributes (see :ref:`hybrids_toplevel`).
689
690 The :class:`.AliasedClass` can be inspected for its underlying
691 :class:`_orm.Mapper`, aliased selectable, and other information
692 using :func:`_sa.inspect`::
693
694 from sqlalchemy import inspect
695 my_alias = aliased(MyClass)
696 insp = inspect(my_alias)
697
698 The resulting inspection object is an instance of :class:`.AliasedInsp`.
699
700
701 .. seealso::
702
703 :func:`.aliased`
704
705 :func:`.with_polymorphic`
706
707 :ref:`relationship_aliased_class`
708
709 :ref:`relationship_to_window_function`
710
711
712 """
713
714 __name__: str
715
716 def __init__(
717 self,
718 mapped_class_or_ac: _EntityType[_O],
719 alias: Optional[FromClause] = None,
720 name: Optional[str] = None,
721 flat: bool = False,
722 adapt_on_names: bool = False,
723 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
724 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
725 base_alias: Optional[AliasedInsp[Any]] = None,
726 use_mapper_path: bool = False,
727 represents_outer_join: bool = False,
728 ):
729 insp = cast(
730 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
731 )
732 mapper = insp.mapper
733
734 nest_adapters = False
735
736 if alias is None:
737 if insp.is_aliased_class and insp.selectable._is_subquery:
738 alias = insp.selectable.alias()
739 else:
740 alias = (
741 mapper._with_polymorphic_selectable._anonymous_fromclause(
742 name=name,
743 flat=flat,
744 )
745 )
746 elif insp.is_aliased_class:
747 nest_adapters = True
748
749 assert alias is not None
750 self._aliased_insp = AliasedInsp(
751 self,
752 insp,
753 alias,
754 name,
755 (
756 with_polymorphic_mappers
757 if with_polymorphic_mappers
758 else mapper.with_polymorphic_mappers
759 ),
760 (
761 with_polymorphic_discriminator
762 if with_polymorphic_discriminator is not None
763 else mapper.polymorphic_on
764 ),
765 base_alias,
766 use_mapper_path,
767 adapt_on_names,
768 represents_outer_join,
769 nest_adapters,
770 )
771
772 self.__name__ = f"aliased({mapper.class_.__name__})"
773
774 @classmethod
775 def _reconstitute_from_aliased_insp(
776 cls, aliased_insp: AliasedInsp[_O]
777 ) -> AliasedClass[_O]:
778 obj = cls.__new__(cls)
779 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
780 obj._aliased_insp = aliased_insp
781
782 if aliased_insp._is_with_polymorphic:
783 for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
784 if sub_aliased_insp is not aliased_insp:
785 ent = AliasedClass._reconstitute_from_aliased_insp(
786 sub_aliased_insp
787 )
788 setattr(obj, sub_aliased_insp.class_.__name__, ent)
789
790 return obj
791
792 def __getattr__(self, key: str) -> Any:
793 try:
794 _aliased_insp = self.__dict__["_aliased_insp"]
795 except KeyError:
796 raise AttributeError()
797 else:
798 target = _aliased_insp._target
799 # maintain all getattr mechanics
800 attr = getattr(target, key)
801
802 # attribute is a method, that will be invoked against a
803 # "self"; so just return a new method with the same function and
804 # new self
805 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
806 return types.MethodType(attr.__func__, self)
807
808 # attribute is a descriptor, that will be invoked against a
809 # "self"; so invoke the descriptor against this self
810 if hasattr(attr, "__get__"):
811 attr = attr.__get__(None, self)
812
813 # attributes within the QueryableAttribute system will want this
814 # to be invoked so the object can be adapted
815 if hasattr(attr, "adapt_to_entity"):
816 attr = attr.adapt_to_entity(_aliased_insp)
817 setattr(self, key, attr)
818
819 return attr
820
821 def _get_from_serialized(
822 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
823 ) -> Any:
824 # this method is only used in terms of the
825 # sqlalchemy.ext.serializer extension
826 attr = getattr(mapped_class, key)
827 if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
828 return types.MethodType(attr.__func__, self)
829
830 # attribute is a descriptor, that will be invoked against a
831 # "self"; so invoke the descriptor against this self
832 if hasattr(attr, "__get__"):
833 attr = attr.__get__(None, self)
834
835 # attributes within the QueryableAttribute system will want this
836 # to be invoked so the object can be adapted
837 if hasattr(attr, "adapt_to_entity"):
838 aliased_insp._weak_entity = weakref.ref(self)
839 attr = attr.adapt_to_entity(aliased_insp)
840 setattr(self, key, attr)
841
842 return attr
843
844 def __repr__(self) -> str:
845 return "<AliasedClass at 0x%x; %s>" % (
846 id(self),
847 self._aliased_insp._target.__name__,
848 )
849
850 def __str__(self) -> str:
851 return str(self._aliased_insp)
852
853
854@inspection._self_inspects
855class AliasedInsp(
856 ORMEntityColumnsClauseRole[_O],
857 ORMFromClauseRole,
858 HasCacheKey,
859 InspectionAttr,
860 MemoizedSlots,
861 inspection.Inspectable["AliasedInsp[_O]"],
862 Generic[_O],
863):
864 """Provide an inspection interface for an
865 :class:`.AliasedClass` object.
866
867 The :class:`.AliasedInsp` object is returned
868 given an :class:`.AliasedClass` using the
869 :func:`_sa.inspect` function::
870
871 from sqlalchemy import inspect
872 from sqlalchemy.orm import aliased
873
874 my_alias = aliased(MyMappedClass)
875 insp = inspect(my_alias)
876
877 Attributes on :class:`.AliasedInsp`
878 include:
879
880 * ``entity`` - the :class:`.AliasedClass` represented.
881 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
882 * ``selectable`` - the :class:`_expression.Alias`
883 construct which ultimately
884 represents an aliased :class:`_schema.Table` or
885 :class:`_expression.Select`
886 construct.
887 * ``name`` - the name of the alias. Also is used as the attribute
888 name when returned in a result tuple from :class:`_query.Query`.
889 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
890 objects
891 indicating all those mappers expressed in the select construct
892 for the :class:`.AliasedClass`.
893 * ``polymorphic_on`` - an alternate column or SQL expression which
894 will be used as the "discriminator" for a polymorphic load.
895
896 .. seealso::
897
898 :ref:`inspection_toplevel`
899
900 """
901
902 __slots__ = (
903 "__weakref__",
904 "_weak_entity",
905 "mapper",
906 "selectable",
907 "name",
908 "_adapt_on_names",
909 "with_polymorphic_mappers",
910 "polymorphic_on",
911 "_use_mapper_path",
912 "_base_alias",
913 "represents_outer_join",
914 "persist_selectable",
915 "local_table",
916 "_is_with_polymorphic",
917 "_with_polymorphic_entities",
918 "_adapter",
919 "_target",
920 "__clause_element__",
921 "_memoized_values",
922 "_all_column_expressions",
923 "_nest_adapters",
924 )
925
926 _cache_key_traversal = [
927 ("name", visitors.ExtendedInternalTraversal.dp_string),
928 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
929 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
930 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
931 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
932 (
933 "with_polymorphic_mappers",
934 visitors.InternalTraversal.dp_has_cache_key_list,
935 ),
936 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
937 ]
938
939 mapper: Mapper[_O]
940 selectable: FromClause
941 _adapter: ORMAdapter
942 with_polymorphic_mappers: Sequence[Mapper[Any]]
943 _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
944
945 _weak_entity: weakref.ref[AliasedClass[_O]]
946 """the AliasedClass that refers to this AliasedInsp"""
947
948 _target: Union[Type[_O], AliasedClass[_O]]
949 """the thing referenced by the AliasedClass/AliasedInsp.
950
951 In the vast majority of cases, this is the mapped class. However
952 it may also be another AliasedClass (alias of alias).
953
954 """
955
956 def __init__(
957 self,
958 entity: AliasedClass[_O],
959 inspected: _InternalEntityType[_O],
960 selectable: FromClause,
961 name: Optional[str],
962 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
963 polymorphic_on: Optional[ColumnElement[Any]],
964 _base_alias: Optional[AliasedInsp[Any]],
965 _use_mapper_path: bool,
966 adapt_on_names: bool,
967 represents_outer_join: bool,
968 nest_adapters: bool,
969 ):
970 mapped_class_or_ac = inspected.entity
971 mapper = inspected.mapper
972
973 self._weak_entity = weakref.ref(entity)
974 self.mapper = mapper
975 self.selectable = self.persist_selectable = self.local_table = (
976 selectable
977 )
978 self.name = name
979 self.polymorphic_on = polymorphic_on
980 self._base_alias = weakref.ref(_base_alias or self)
981 self._use_mapper_path = _use_mapper_path
982 self.represents_outer_join = represents_outer_join
983 self._nest_adapters = nest_adapters
984
985 if with_polymorphic_mappers:
986 self._is_with_polymorphic = True
987 self.with_polymorphic_mappers = with_polymorphic_mappers
988 self._with_polymorphic_entities = []
989 for poly in self.with_polymorphic_mappers:
990 if poly is not mapper:
991 ent = AliasedClass(
992 poly.class_,
993 selectable,
994 base_alias=self,
995 adapt_on_names=adapt_on_names,
996 use_mapper_path=_use_mapper_path,
997 )
998
999 setattr(self.entity, poly.class_.__name__, ent)
1000 self._with_polymorphic_entities.append(ent._aliased_insp)
1001
1002 else:
1003 self._is_with_polymorphic = False
1004 self.with_polymorphic_mappers = [mapper]
1005
1006 self._adapter = ORMAdapter(
1007 _TraceAdaptRole.ALIASED_INSP,
1008 mapper,
1009 selectable=selectable,
1010 equivalents=mapper._equivalent_columns,
1011 adapt_on_names=adapt_on_names,
1012 anonymize_labels=True,
1013 # make sure the adapter doesn't try to grab other tables that
1014 # are not even the thing we are mapping, such as embedded
1015 # selectables in subqueries or CTEs. See issue #6060
1016 adapt_from_selectables={
1017 m.selectable
1018 for m in self.with_polymorphic_mappers
1019 if not adapt_on_names
1020 },
1021 limit_on_entity=False,
1022 )
1023
1024 if nest_adapters:
1025 # supports "aliased class of aliased class" use case
1026 assert isinstance(inspected, AliasedInsp)
1027 self._adapter = inspected._adapter.wrap(self._adapter)
1028
1029 self._adapt_on_names = adapt_on_names
1030 self._target = mapped_class_or_ac
1031
1032 @classmethod
1033 def _alias_factory(
1034 cls,
1035 element: Union[_EntityType[_O], FromClause],
1036 alias: Optional[FromClause] = None,
1037 name: Optional[str] = None,
1038 flat: bool = False,
1039 adapt_on_names: bool = False,
1040 ) -> Union[AliasedClass[_O], FromClause]:
1041 if isinstance(element, FromClause):
1042 if adapt_on_names:
1043 raise sa_exc.ArgumentError(
1044 "adapt_on_names only applies to ORM elements"
1045 )
1046 if name:
1047 return element.alias(name=name, flat=flat)
1048 else:
1049 return coercions.expect(
1050 roles.AnonymizedFromClauseRole, element, flat=flat
1051 )
1052 else:
1053 return AliasedClass(
1054 element,
1055 alias=alias,
1056 flat=flat,
1057 name=name,
1058 adapt_on_names=adapt_on_names,
1059 )
1060
1061 @classmethod
1062 def _with_polymorphic_factory(
1063 cls,
1064 base: Union[Type[_O], Mapper[_O]],
1065 classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
1066 selectable: Union[Literal[False, None], FromClause] = False,
1067 flat: bool = False,
1068 polymorphic_on: Optional[ColumnElement[Any]] = None,
1069 aliased: bool = False,
1070 innerjoin: bool = False,
1071 adapt_on_names: bool = False,
1072 name: Optional[str] = None,
1073 _use_mapper_path: bool = False,
1074 ) -> AliasedClass[_O]:
1075 primary_mapper = _class_to_mapper(base)
1076
1077 if selectable not in (None, False) and flat:
1078 raise sa_exc.ArgumentError(
1079 "the 'flat' and 'selectable' arguments cannot be passed "
1080 "simultaneously to with_polymorphic()"
1081 )
1082
1083 mappers, selectable = primary_mapper._with_polymorphic_args(
1084 classes, selectable, innerjoin=innerjoin
1085 )
1086 if aliased or flat:
1087 assert selectable is not None
1088 selectable = selectable._anonymous_fromclause(flat=flat)
1089
1090 return AliasedClass(
1091 base,
1092 selectable,
1093 name=name,
1094 with_polymorphic_mappers=mappers,
1095 adapt_on_names=adapt_on_names,
1096 with_polymorphic_discriminator=polymorphic_on,
1097 use_mapper_path=_use_mapper_path,
1098 represents_outer_join=not innerjoin,
1099 )
1100
1101 @property
1102 def entity(self) -> AliasedClass[_O]:
1103 # to eliminate reference cycles, the AliasedClass is held weakly.
1104 # this produces some situations where the AliasedClass gets lost,
1105 # particularly when one is created internally and only the AliasedInsp
1106 # is passed around.
1107 # to work around this case, we just generate a new one when we need
1108 # it, as it is a simple class with very little initial state on it.
1109 ent = self._weak_entity()
1110 if ent is None:
1111 ent = AliasedClass._reconstitute_from_aliased_insp(self)
1112 self._weak_entity = weakref.ref(ent)
1113 return ent
1114
1115 is_aliased_class = True
1116 "always returns True"
1117
1118 def _memoized_method___clause_element__(self) -> FromClause:
1119 return self.selectable._annotate(
1120 {
1121 "parentmapper": self.mapper,
1122 "parententity": self,
1123 "entity_namespace": self,
1124 }
1125 )._set_propagate_attrs(
1126 {"compile_state_plugin": "orm", "plugin_subject": self}
1127 )
1128
1129 @property
1130 def entity_namespace(self) -> AliasedClass[_O]:
1131 return self.entity
1132
1133 @property
1134 def class_(self) -> Type[_O]:
1135 """Return the mapped class ultimately represented by this
1136 :class:`.AliasedInsp`."""
1137 return self.mapper.class_
1138
1139 @property
1140 def _path_registry(self) -> AbstractEntityRegistry:
1141 if self._use_mapper_path:
1142 return self.mapper._path_registry
1143 else:
1144 return PathRegistry.per_mapper(self)
1145
1146 def __getstate__(self) -> Dict[str, Any]:
1147 return {
1148 "entity": self.entity,
1149 "mapper": self.mapper,
1150 "alias": self.selectable,
1151 "name": self.name,
1152 "adapt_on_names": self._adapt_on_names,
1153 "with_polymorphic_mappers": self.with_polymorphic_mappers,
1154 "with_polymorphic_discriminator": self.polymorphic_on,
1155 "base_alias": self._base_alias(),
1156 "use_mapper_path": self._use_mapper_path,
1157 "represents_outer_join": self.represents_outer_join,
1158 "nest_adapters": self._nest_adapters,
1159 }
1160
1161 def __setstate__(self, state: Dict[str, Any]) -> None:
1162 self.__init__( # type: ignore
1163 state["entity"],
1164 state["mapper"],
1165 state["alias"],
1166 state["name"],
1167 state["with_polymorphic_mappers"],
1168 state["with_polymorphic_discriminator"],
1169 state["base_alias"],
1170 state["use_mapper_path"],
1171 state["adapt_on_names"],
1172 state["represents_outer_join"],
1173 state["nest_adapters"],
1174 )
1175
1176 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
1177 # assert self._is_with_polymorphic
1178 # assert other._is_with_polymorphic
1179
1180 primary_mapper = other.mapper
1181
1182 assert self.mapper is primary_mapper
1183
1184 our_classes = util.to_set(
1185 mp.class_ for mp in self.with_polymorphic_mappers
1186 )
1187 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
1188 if our_classes == new_classes:
1189 return other
1190 else:
1191 classes = our_classes.union(new_classes)
1192
1193 mappers, selectable = primary_mapper._with_polymorphic_args(
1194 classes, None, innerjoin=not other.represents_outer_join
1195 )
1196 selectable = selectable._anonymous_fromclause(flat=True)
1197 return AliasedClass(
1198 primary_mapper,
1199 selectable,
1200 with_polymorphic_mappers=mappers,
1201 with_polymorphic_discriminator=other.polymorphic_on,
1202 use_mapper_path=other._use_mapper_path,
1203 represents_outer_join=other.represents_outer_join,
1204 )._aliased_insp
1205
1206 def _adapt_element(
1207 self, expr: _ORMCOLEXPR, key: Optional[str] = None
1208 ) -> _ORMCOLEXPR:
1209 assert isinstance(expr, ColumnElement)
1210 d: Dict[str, Any] = {
1211 "parententity": self,
1212 "parentmapper": self.mapper,
1213 }
1214 if key:
1215 d["proxy_key"] = key
1216
1217 # IMO mypy should see this one also as returning the same type
1218 # we put into it, but it's not
1219 return (
1220 self._adapter.traverse(expr)
1221 ._annotate(d)
1222 ._set_propagate_attrs(
1223 {"compile_state_plugin": "orm", "plugin_subject": self}
1224 )
1225 )
1226
1227 if TYPE_CHECKING:
1228 # establish compatibility with the _ORMAdapterProto protocol,
1229 # which in turn is compatible with _CoreAdapterProto.
1230
1231 def _orm_adapt_element(
1232 self,
1233 obj: _CE,
1234 key: Optional[str] = None,
1235 ) -> _CE: ...
1236
1237 else:
1238 _orm_adapt_element = _adapt_element
1239
1240 def _entity_for_mapper(self, mapper):
1241 self_poly = self.with_polymorphic_mappers
1242 if mapper in self_poly:
1243 if mapper is self.mapper:
1244 return self
1245 else:
1246 return getattr(
1247 self.entity, mapper.class_.__name__
1248 )._aliased_insp
1249 elif mapper.isa(self.mapper):
1250 return self
1251 else:
1252 assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
1253
1254 def _memoized_attr__get_clause(self):
1255 onclause, replacemap = self.mapper._get_clause
1256 return (
1257 self._adapter.traverse(onclause),
1258 {
1259 self._adapter.traverse(col): param
1260 for col, param in replacemap.items()
1261 },
1262 )
1263
1264 def _memoized_attr__memoized_values(self):
1265 return {}
1266
1267 def _memoized_attr__all_column_expressions(self):
1268 if self._is_with_polymorphic:
1269 cols_plus_keys = self.mapper._columns_plus_keys(
1270 [ent.mapper for ent in self._with_polymorphic_entities]
1271 )
1272 else:
1273 cols_plus_keys = self.mapper._columns_plus_keys()
1274
1275 cols_plus_keys = [
1276 (key, self._adapt_element(col)) for key, col in cols_plus_keys
1277 ]
1278
1279 return ColumnCollection(cols_plus_keys)
1280
1281 def _memo(self, key, callable_, *args, **kw):
1282 if key in self._memoized_values:
1283 return self._memoized_values[key]
1284 else:
1285 self._memoized_values[key] = value = callable_(*args, **kw)
1286 return value
1287
1288 def __repr__(self):
1289 if self.with_polymorphic_mappers:
1290 with_poly = "(%s)" % ", ".join(
1291 mp.class_.__name__ for mp in self.with_polymorphic_mappers
1292 )
1293 else:
1294 with_poly = ""
1295 return "<AliasedInsp at 0x%x; %s%s>" % (
1296 id(self),
1297 self.class_.__name__,
1298 with_poly,
1299 )
1300
1301 def __str__(self):
1302 if self._is_with_polymorphic:
1303 return "with_polymorphic(%s, [%s])" % (
1304 self._target.__name__,
1305 ", ".join(
1306 mp.class_.__name__
1307 for mp in self.with_polymorphic_mappers
1308 if mp is not self.mapper
1309 ),
1310 )
1311 else:
1312 return "aliased(%s)" % (self._target.__name__,)
1313
1314
1315class _WrapUserEntity:
1316 """A wrapper used within the loader_criteria lambda caller so that
1317 we can bypass declared_attr descriptors on unmapped mixins, which
1318 normally emit a warning for such use.
1319
1320 might also be useful for other per-lambda instrumentations should
1321 the need arise.
1322
1323 """
1324
1325 __slots__ = ("subject",)
1326
1327 def __init__(self, subject):
1328 self.subject = subject
1329
1330 @util.preload_module("sqlalchemy.orm.decl_api")
1331 def __getattribute__(self, name):
1332 decl_api = util.preloaded.orm.decl_api
1333
1334 subject = object.__getattribute__(self, "subject")
1335 if name in subject.__dict__ and isinstance(
1336 subject.__dict__[name], decl_api.declared_attr
1337 ):
1338 return subject.__dict__[name].fget(subject)
1339 else:
1340 return getattr(subject, name)
1341
1342
1343class LoaderCriteriaOption(CriteriaOption):
1344 """Add additional WHERE criteria to the load for all occurrences of
1345 a particular entity.
1346
1347 :class:`_orm.LoaderCriteriaOption` is invoked using the
1348 :func:`_orm.with_loader_criteria` function; see that function for
1349 details.
1350
1351 .. versionadded:: 1.4
1352
1353 """
1354
1355 __slots__ = (
1356 "root_entity",
1357 "entity",
1358 "deferred_where_criteria",
1359 "where_criteria",
1360 "_where_crit_orig",
1361 "include_aliases",
1362 "propagate_to_loaders",
1363 )
1364
1365 _traverse_internals = [
1366 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
1367 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
1368 ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
1369 ("include_aliases", visitors.InternalTraversal.dp_boolean),
1370 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
1371 ]
1372
1373 root_entity: Optional[Type[Any]]
1374 entity: Optional[_InternalEntityType[Any]]
1375 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
1376 deferred_where_criteria: bool
1377 include_aliases: bool
1378 propagate_to_loaders: bool
1379
1380 _where_crit_orig: Any
1381
1382 def __init__(
1383 self,
1384 entity_or_base: _EntityType[Any],
1385 where_criteria: Union[
1386 _ColumnExpressionArgument[bool],
1387 Callable[[Any], _ColumnExpressionArgument[bool]],
1388 ],
1389 loader_only: bool = False,
1390 include_aliases: bool = False,
1391 propagate_to_loaders: bool = True,
1392 track_closure_variables: bool = True,
1393 ):
1394 entity = cast(
1395 "_InternalEntityType[Any]",
1396 inspection.inspect(entity_or_base, False),
1397 )
1398 if entity is None:
1399 self.root_entity = cast("Type[Any]", entity_or_base)
1400 self.entity = None
1401 else:
1402 self.root_entity = None
1403 self.entity = entity
1404
1405 self._where_crit_orig = where_criteria
1406 if callable(where_criteria):
1407 if self.root_entity is not None:
1408 wrap_entity = self.root_entity
1409 else:
1410 assert entity is not None
1411 wrap_entity = entity.entity
1412
1413 self.deferred_where_criteria = True
1414 self.where_criteria = lambdas.DeferredLambdaElement(
1415 where_criteria,
1416 roles.WhereHavingRole,
1417 lambda_args=(_WrapUserEntity(wrap_entity),),
1418 opts=lambdas.LambdaOptions(
1419 track_closure_variables=track_closure_variables
1420 ),
1421 )
1422 else:
1423 self.deferred_where_criteria = False
1424 self.where_criteria = coercions.expect(
1425 roles.WhereHavingRole, where_criteria
1426 )
1427
1428 self.include_aliases = include_aliases
1429 self.propagate_to_loaders = propagate_to_loaders
1430
1431 @classmethod
1432 def _unreduce(
1433 cls, entity, where_criteria, include_aliases, propagate_to_loaders
1434 ):
1435 return LoaderCriteriaOption(
1436 entity,
1437 where_criteria,
1438 include_aliases=include_aliases,
1439 propagate_to_loaders=propagate_to_loaders,
1440 )
1441
1442 def __reduce__(self):
1443 return (
1444 LoaderCriteriaOption._unreduce,
1445 (
1446 self.entity.class_ if self.entity else self.root_entity,
1447 self._where_crit_orig,
1448 self.include_aliases,
1449 self.propagate_to_loaders,
1450 ),
1451 )
1452
1453 def _all_mappers(self) -> Iterator[Mapper[Any]]:
1454 if self.entity:
1455 yield from self.entity.mapper.self_and_descendants
1456 else:
1457 assert self.root_entity
1458 stack = list(self.root_entity.__subclasses__())
1459 while stack:
1460 subclass = stack.pop(0)
1461 ent = cast(
1462 "_InternalEntityType[Any]",
1463 inspection.inspect(subclass, raiseerr=False),
1464 )
1465 if ent:
1466 yield from ent.mapper.self_and_descendants
1467 else:
1468 stack.extend(subclass.__subclasses__())
1469
1470 def _should_include(self, compile_state: ORMCompileState) -> bool:
1471 if (
1472 compile_state.select_statement._annotations.get(
1473 "for_loader_criteria", None
1474 )
1475 is self
1476 ):
1477 return False
1478 return True
1479
1480 def _resolve_where_criteria(
1481 self, ext_info: _InternalEntityType[Any]
1482 ) -> ColumnElement[bool]:
1483 if self.deferred_where_criteria:
1484 crit = cast(
1485 "ColumnElement[bool]",
1486 self.where_criteria._resolve_with_args(ext_info.entity),
1487 )
1488 else:
1489 crit = self.where_criteria # type: ignore
1490 assert isinstance(crit, ColumnElement)
1491 return sql_util._deep_annotate(
1492 crit,
1493 {"for_loader_criteria": self},
1494 detect_subquery_cols=True,
1495 ind_cols_on_fromclause=True,
1496 )
1497
1498 def process_compile_state_replaced_entities(
1499 self,
1500 compile_state: ORMCompileState,
1501 mapper_entities: Iterable[_MapperEntity],
1502 ) -> None:
1503 self.process_compile_state(compile_state)
1504
1505 def process_compile_state(self, compile_state: ORMCompileState) -> None:
1506 """Apply a modification to a given :class:`.CompileState`."""
1507
1508 # if options to limit the criteria to immediate query only,
1509 # use compile_state.attributes instead
1510
1511 self.get_global_criteria(compile_state.global_attributes)
1512
1513 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
1514 for mp in self._all_mappers():
1515 load_criteria = attributes.setdefault(
1516 ("additional_entity_criteria", mp), []
1517 )
1518
1519 load_criteria.append(self)
1520
1521
1522inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
1523
1524
1525@inspection._inspects(type)
1526def _inspect_mc(
1527 class_: Type[_O],
1528) -> Optional[Mapper[_O]]:
1529 try:
1530 class_manager = opt_manager_of_class(class_)
1531 if class_manager is None or not class_manager.is_mapped:
1532 return None
1533 mapper = class_manager.mapper
1534 except exc.NO_STATE:
1535 return None
1536 else:
1537 return mapper
1538
1539
1540GenericAlias = type(List[Any])
1541
1542
1543@inspection._inspects(GenericAlias)
1544def _inspect_generic_alias(
1545 class_: Type[_O],
1546) -> Optional[Mapper[_O]]:
1547 origin = cast("Type[_O]", typing_get_origin(class_))
1548 return _inspect_mc(origin)
1549
1550
1551@inspection._self_inspects
1552class Bundle(
1553 ORMColumnsClauseRole[_T],
1554 SupportsCloneAnnotations,
1555 MemoizedHasCacheKey,
1556 inspection.Inspectable["Bundle[_T]"],
1557 InspectionAttr,
1558):
1559 """A grouping of SQL expressions that are returned by a :class:`.Query`
1560 under one namespace.
1561
1562 The :class:`.Bundle` essentially allows nesting of the tuple-based
1563 results returned by a column-oriented :class:`_query.Query` object.
1564 It also
1565 is extensible via simple subclassing, where the primary capability
1566 to override is that of how the set of expressions should be returned,
1567 allowing post-processing as well as custom return types, without
1568 involving ORM identity-mapped classes.
1569
1570 .. seealso::
1571
1572 :ref:`bundles`
1573
1574
1575 """
1576
1577 single_entity = False
1578 """If True, queries for a single Bundle will be returned as a single
1579 entity, rather than an element within a keyed tuple."""
1580
1581 is_clause_element = False
1582
1583 is_mapper = False
1584
1585 is_aliased_class = False
1586
1587 is_bundle = True
1588
1589 _propagate_attrs: _PropagateAttrsType = util.immutabledict()
1590
1591 proxy_set = util.EMPTY_SET # type: ignore
1592
1593 exprs: List[_ColumnsClauseElement]
1594
1595 def __init__(
1596 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
1597 ):
1598 r"""Construct a new :class:`.Bundle`.
1599
1600 e.g.::
1601
1602 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1603
1604 for row in session.query(bn).filter(
1605 bn.c.x == 5).filter(bn.c.y == 4):
1606 print(row.mybundle.x, row.mybundle.y)
1607
1608 :param name: name of the bundle.
1609 :param \*exprs: columns or SQL expressions comprising the bundle.
1610 :param single_entity=False: if True, rows for this :class:`.Bundle`
1611 can be returned as a "single entity" outside of any enclosing tuple
1612 in the same manner as a mapped entity.
1613
1614 """
1615 self.name = self._label = name
1616 coerced_exprs = [
1617 coercions.expect(
1618 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
1619 )
1620 for expr in exprs
1621 ]
1622 self.exprs = coerced_exprs
1623
1624 self.c = self.columns = ColumnCollection(
1625 (getattr(col, "key", col._label), col)
1626 for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
1627 ).as_readonly()
1628 self.single_entity = kw.pop("single_entity", self.single_entity)
1629
1630 def _gen_cache_key(
1631 self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
1632 ) -> Tuple[Any, ...]:
1633 return (self.__class__, self.name, self.single_entity) + tuple(
1634 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
1635 )
1636
1637 @property
1638 def mapper(self) -> Optional[Mapper[Any]]:
1639 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
1640 "parentmapper", None
1641 )
1642 return mp
1643
1644 @property
1645 def entity(self) -> Optional[_InternalEntityType[Any]]:
1646 ie: Optional[_InternalEntityType[Any]] = self.exprs[
1647 0
1648 ]._annotations.get("parententity", None)
1649 return ie
1650
1651 @property
1652 def entity_namespace(
1653 self,
1654 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
1655 return self.c
1656
1657 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1658
1659 """A namespace of SQL expressions referred to by this :class:`.Bundle`.
1660
1661 e.g.::
1662
1663 bn = Bundle("mybundle", MyClass.x, MyClass.y)
1664
1665 q = sess.query(bn).filter(bn.c.x == 5)
1666
1667 Nesting of bundles is also supported::
1668
1669 b1 = Bundle("b1",
1670 Bundle('b2', MyClass.a, MyClass.b),
1671 Bundle('b3', MyClass.x, MyClass.y)
1672 )
1673
1674 q = sess.query(b1).filter(
1675 b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
1676
1677 .. seealso::
1678
1679 :attr:`.Bundle.c`
1680
1681 """
1682
1683 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
1684 """An alias for :attr:`.Bundle.columns`."""
1685
1686 def _clone(self):
1687 cloned = self.__class__.__new__(self.__class__)
1688 cloned.__dict__.update(self.__dict__)
1689 return cloned
1690
1691 def __clause_element__(self):
1692 # ensure existing entity_namespace remains
1693 annotations = {"bundle": self, "entity_namespace": self}
1694 annotations.update(self._annotations)
1695
1696 plugin_subject = self.exprs[0]._propagate_attrs.get(
1697 "plugin_subject", self.entity
1698 )
1699 return (
1700 expression.ClauseList(
1701 _literal_as_text_role=roles.ColumnsClauseRole,
1702 group=False,
1703 *[e._annotations.get("bundle", e) for e in self.exprs],
1704 )
1705 ._annotate(annotations)
1706 ._set_propagate_attrs(
1707 # the Bundle *must* use the orm plugin no matter what. the
1708 # subject can be None but it's much better if it's not.
1709 {
1710 "compile_state_plugin": "orm",
1711 "plugin_subject": plugin_subject,
1712 }
1713 )
1714 )
1715
1716 @property
1717 def clauses(self):
1718 return self.__clause_element__().clauses
1719
1720 def label(self, name):
1721 """Provide a copy of this :class:`.Bundle` passing a new label."""
1722
1723 cloned = self._clone()
1724 cloned.name = name
1725 return cloned
1726
1727 def create_row_processor(
1728 self,
1729 query: Select[Unpack[TupleAny]],
1730 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
1731 labels: Sequence[str],
1732 ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
1733 """Produce the "row processing" function for this :class:`.Bundle`.
1734
1735 May be overridden by subclasses to provide custom behaviors when
1736 results are fetched. The method is passed the statement object and a
1737 set of "row processor" functions at query execution time; these
1738 processor functions when given a result row will return the individual
1739 attribute value, which can then be adapted into any kind of return data
1740 structure.
1741
1742 The example below illustrates replacing the usual :class:`.Row`
1743 return structure with a straight Python dictionary::
1744
1745 from sqlalchemy.orm import Bundle
1746
1747 class DictBundle(Bundle):
1748 def create_row_processor(self, query, procs, labels):
1749 'Override create_row_processor to return values as
1750 dictionaries'
1751
1752 def proc(row):
1753 return dict(
1754 zip(labels, (proc(row) for proc in procs))
1755 )
1756 return proc
1757
1758 A result from the above :class:`_orm.Bundle` will return dictionary
1759 values::
1760
1761 bn = DictBundle('mybundle', MyClass.data1, MyClass.data2)
1762 for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'):
1763 print(row.mybundle['data1'], row.mybundle['data2'])
1764
1765 """
1766 keyed_tuple = result_tuple(labels, [() for l in labels])
1767
1768 def proc(row: Row[Unpack[TupleAny]]) -> Any:
1769 return keyed_tuple([proc(row) for proc in procs])
1770
1771 return proc
1772
1773
1774def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
1775 """Deep copy the given ClauseElement, annotating each element with the
1776 "_orm_adapt" flag.
1777
1778 Elements within the exclude collection will be cloned but not annotated.
1779
1780 """
1781 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
1782
1783
1784def _orm_deannotate(element: _SA) -> _SA:
1785 """Remove annotations that link a column to a particular mapping.
1786
1787 Note this doesn't affect "remote" and "foreign" annotations
1788 passed by the :func:`_orm.foreign` and :func:`_orm.remote`
1789 annotators.
1790
1791 """
1792
1793 return sql_util._deep_deannotate(
1794 element, values=("_orm_adapt", "parententity")
1795 )
1796
1797
1798def _orm_full_deannotate(element: _SA) -> _SA:
1799 return sql_util._deep_deannotate(element)
1800
1801
1802class _ORMJoin(expression.Join):
1803 """Extend Join to support ORM constructs as input."""
1804
1805 __visit_name__ = expression.Join.__visit_name__
1806
1807 inherit_cache = True
1808
1809 def __init__(
1810 self,
1811 left: _FromClauseArgument,
1812 right: _FromClauseArgument,
1813 onclause: Optional[_OnClauseArgument] = None,
1814 isouter: bool = False,
1815 full: bool = False,
1816 _left_memo: Optional[Any] = None,
1817 _right_memo: Optional[Any] = None,
1818 _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
1819 ):
1820 left_info = cast(
1821 "Union[FromClause, _InternalEntityType[Any]]",
1822 inspection.inspect(left),
1823 )
1824
1825 right_info = cast(
1826 "Union[FromClause, _InternalEntityType[Any]]",
1827 inspection.inspect(right),
1828 )
1829 adapt_to = right_info.selectable
1830
1831 # used by joined eager loader
1832 self._left_memo = _left_memo
1833 self._right_memo = _right_memo
1834
1835 if isinstance(onclause, attributes.QueryableAttribute):
1836 if TYPE_CHECKING:
1837 assert isinstance(
1838 onclause.comparator, RelationshipProperty.Comparator
1839 )
1840 on_selectable = onclause.comparator._source_selectable()
1841 prop = onclause.property
1842 _extra_criteria += onclause._extra_criteria
1843 elif isinstance(onclause, MapperProperty):
1844 # used internally by joined eager loader...possibly not ideal
1845 prop = onclause
1846 on_selectable = prop.parent.selectable
1847 else:
1848 prop = None
1849 on_selectable = None
1850
1851 left_selectable = left_info.selectable
1852 if prop:
1853 adapt_from: Optional[FromClause]
1854 if sql_util.clause_is_present(on_selectable, left_selectable):
1855 adapt_from = on_selectable
1856 else:
1857 assert isinstance(left_selectable, FromClause)
1858 adapt_from = left_selectable
1859
1860 (
1861 pj,
1862 sj,
1863 source,
1864 dest,
1865 secondary,
1866 target_adapter,
1867 ) = prop._create_joins(
1868 source_selectable=adapt_from,
1869 dest_selectable=adapt_to,
1870 source_polymorphic=True,
1871 of_type_entity=right_info,
1872 alias_secondary=True,
1873 extra_criteria=_extra_criteria,
1874 )
1875
1876 if sj is not None:
1877 if isouter:
1878 # note this is an inner join from secondary->right
1879 right = sql.join(secondary, right, sj)
1880 onclause = pj
1881 else:
1882 left = sql.join(left, secondary, pj, isouter)
1883 onclause = sj
1884 else:
1885 onclause = pj
1886
1887 self._target_adapter = target_adapter
1888
1889 # we don't use the normal coercions logic for _ORMJoin
1890 # (probably should), so do some gymnastics to get the entity.
1891 # logic here is for #8721, which was a major bug in 1.4
1892 # for almost two years, not reported/fixed until 1.4.43 (!)
1893 if is_selectable(left_info):
1894 parententity = left_selectable._annotations.get(
1895 "parententity", None
1896 )
1897 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
1898 parententity = left_info
1899 else:
1900 parententity = None
1901
1902 if parententity is not None:
1903 self._annotations = self._annotations.union(
1904 {"parententity": parententity}
1905 )
1906
1907 augment_onclause = bool(_extra_criteria) and not prop
1908 expression.Join.__init__(self, left, right, onclause, isouter, full)
1909
1910 assert self.onclause is not None
1911
1912 if augment_onclause:
1913 self.onclause &= sql.and_(*_extra_criteria)
1914
1915 if (
1916 not prop
1917 and getattr(right_info, "mapper", None)
1918 and right_info.mapper.single # type: ignore
1919 ):
1920 right_info = cast("_InternalEntityType[Any]", right_info)
1921 # if single inheritance target and we are using a manual
1922 # or implicit ON clause, augment it the same way we'd augment the
1923 # WHERE.
1924 single_crit = right_info.mapper._single_table_criterion
1925 if single_crit is not None:
1926 if insp_is_aliased_class(right_info):
1927 single_crit = right_info._adapter.traverse(single_crit)
1928 self.onclause = self.onclause & single_crit
1929
1930 def _splice_into_center(self, other):
1931 """Splice a join into the center.
1932
1933 Given join(a, b) and join(b, c), return join(a, b).join(c)
1934
1935 """
1936 leftmost = other
1937 while isinstance(leftmost, sql.Join):
1938 leftmost = leftmost.left
1939
1940 assert self.right is leftmost
1941
1942 left = _ORMJoin(
1943 self.left,
1944 other.left,
1945 self.onclause,
1946 isouter=self.isouter,
1947 _left_memo=self._left_memo,
1948 _right_memo=None,
1949 )
1950
1951 return _ORMJoin(
1952 left,
1953 other.right,
1954 other.onclause,
1955 isouter=other.isouter,
1956 _right_memo=other._right_memo,
1957 )
1958
1959 def join(
1960 self,
1961 right: _FromClauseArgument,
1962 onclause: Optional[_OnClauseArgument] = None,
1963 isouter: bool = False,
1964 full: bool = False,
1965 ) -> _ORMJoin:
1966 return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
1967
1968 def outerjoin(
1969 self,
1970 right: _FromClauseArgument,
1971 onclause: Optional[_OnClauseArgument] = None,
1972 full: bool = False,
1973 ) -> _ORMJoin:
1974 return _ORMJoin(self, right, onclause, isouter=True, full=full)
1975
1976
1977def with_parent(
1978 instance: object,
1979 prop: attributes.QueryableAttribute[Any],
1980 from_entity: Optional[_EntityType[Any]] = None,
1981) -> ColumnElement[bool]:
1982 """Create filtering criterion that relates this query's primary entity
1983 to the given related instance, using established
1984 :func:`_orm.relationship()`
1985 configuration.
1986
1987 E.g.::
1988
1989 stmt = select(Address).where(with_parent(some_user, User.addresses))
1990
1991
1992 The SQL rendered is the same as that rendered when a lazy loader
1993 would fire off from the given parent on that attribute, meaning
1994 that the appropriate state is taken from the parent object in
1995 Python without the need to render joins to the parent table
1996 in the rendered statement.
1997
1998 The given property may also make use of :meth:`_orm.PropComparator.of_type`
1999 to indicate the left side of the criteria::
2000
2001
2002 a1 = aliased(Address)
2003 a2 = aliased(Address)
2004 stmt = select(a1, a2).where(
2005 with_parent(u1, User.addresses.of_type(a2))
2006 )
2007
2008 The above use is equivalent to using the
2009 :func:`_orm.with_parent.from_entity` argument::
2010
2011 a1 = aliased(Address)
2012 a2 = aliased(Address)
2013 stmt = select(a1, a2).where(
2014 with_parent(u1, User.addresses, from_entity=a2)
2015 )
2016
2017 :param instance:
2018 An instance which has some :func:`_orm.relationship`.
2019
2020 :param property:
2021 Class-bound attribute, which indicates
2022 what relationship from the instance should be used to reconcile the
2023 parent/child relationship.
2024
2025 :param from_entity:
2026 Entity in which to consider as the left side. This defaults to the
2027 "zero" entity of the :class:`_query.Query` itself.
2028
2029 .. versionadded:: 1.2
2030
2031 """
2032 prop_t: RelationshipProperty[Any]
2033
2034 if isinstance(prop, str):
2035 raise sa_exc.ArgumentError(
2036 "with_parent() accepts class-bound mapped attributes, not strings"
2037 )
2038 elif isinstance(prop, attributes.QueryableAttribute):
2039 if prop._of_type:
2040 from_entity = prop._of_type
2041 mapper_property = prop.property
2042 if mapper_property is None or not prop_is_relationship(
2043 mapper_property
2044 ):
2045 raise sa_exc.ArgumentError(
2046 f"Expected relationship property for with_parent(), "
2047 f"got {mapper_property}"
2048 )
2049 prop_t = mapper_property
2050 else:
2051 prop_t = prop
2052
2053 return prop_t._with_parent(instance, from_entity=from_entity)
2054
2055
2056def has_identity(object_: object) -> bool:
2057 """Return True if the given object has a database
2058 identity.
2059
2060 This typically corresponds to the object being
2061 in either the persistent or detached state.
2062
2063 .. seealso::
2064
2065 :func:`.was_deleted`
2066
2067 """
2068 state = attributes.instance_state(object_)
2069 return state.has_identity
2070
2071
2072def was_deleted(object_: object) -> bool:
2073 """Return True if the given object was deleted
2074 within a session flush.
2075
2076 This is regardless of whether or not the object is
2077 persistent or detached.
2078
2079 .. seealso::
2080
2081 :attr:`.InstanceState.was_deleted`
2082
2083 """
2084
2085 state = attributes.instance_state(object_)
2086 return state.was_deleted
2087
2088
2089def _entity_corresponds_to(
2090 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2091) -> bool:
2092 """determine if 'given' corresponds to 'entity', in terms
2093 of an entity passed to Query that would match the same entity
2094 being referred to elsewhere in the query.
2095
2096 """
2097 if insp_is_aliased_class(entity):
2098 if insp_is_aliased_class(given):
2099 if entity._base_alias() is given._base_alias():
2100 return True
2101 return False
2102 elif insp_is_aliased_class(given):
2103 if given._use_mapper_path:
2104 return entity in given.with_polymorphic_mappers
2105 else:
2106 return entity is given
2107
2108 assert insp_is_mapper(given)
2109 return entity.common_parent(given)
2110
2111
2112def _entity_corresponds_to_use_path_impl(
2113 given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
2114) -> bool:
2115 """determine if 'given' corresponds to 'entity', in terms
2116 of a path of loader options where a mapped attribute is taken to
2117 be a member of a parent entity.
2118
2119 e.g.::
2120
2121 someoption(A).someoption(A.b) # -> fn(A, A) -> True
2122 someoption(A).someoption(C.d) # -> fn(A, C) -> False
2123
2124 a1 = aliased(A)
2125 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
2126 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
2127
2128 wp = with_polymorphic(A, [A1, A2])
2129 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
2130 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
2131
2132
2133 """
2134 if insp_is_aliased_class(given):
2135 return (
2136 insp_is_aliased_class(entity)
2137 and not entity._use_mapper_path
2138 and (given is entity or entity in given._with_polymorphic_entities)
2139 )
2140 elif not insp_is_aliased_class(entity):
2141 return given.isa(entity.mapper)
2142 else:
2143 return (
2144 entity._use_mapper_path
2145 and given in entity.with_polymorphic_mappers
2146 )
2147
2148
2149def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
2150 """determine if 'given' "is a" mapper, in terms of the given
2151 would load rows of type 'mapper'.
2152
2153 """
2154 if given.is_aliased_class:
2155 return mapper in given.with_polymorphic_mappers or given.mapper.isa(
2156 mapper
2157 )
2158 elif given.with_polymorphic_mappers:
2159 return mapper in given.with_polymorphic_mappers or given.isa(mapper)
2160 else:
2161 return given.isa(mapper)
2162
2163
2164def _getitem(iterable_query: Query[Any], item: Any) -> Any:
2165 """calculate __getitem__ in terms of an iterable query object
2166 that also has a slice() method.
2167
2168 """
2169
2170 def _no_negative_indexes():
2171 raise IndexError(
2172 "negative indexes are not accepted by SQL "
2173 "index / slice operators"
2174 )
2175
2176 if isinstance(item, slice):
2177 start, stop, step = util.decode_slice(item)
2178
2179 if (
2180 isinstance(stop, int)
2181 and isinstance(start, int)
2182 and stop - start <= 0
2183 ):
2184 return []
2185
2186 elif (isinstance(start, int) and start < 0) or (
2187 isinstance(stop, int) and stop < 0
2188 ):
2189 _no_negative_indexes()
2190
2191 res = iterable_query.slice(start, stop)
2192 if step is not None:
2193 return list(res)[None : None : item.step]
2194 else:
2195 return list(res)
2196 else:
2197 if item == -1:
2198 _no_negative_indexes()
2199 else:
2200 return list(iterable_query[item : item + 1])[0]
2201
2202
2203def _is_mapped_annotation(
2204 raw_annotation: _AnnotationScanType,
2205 cls: Type[Any],
2206 originating_cls: Type[Any],
2207) -> bool:
2208 try:
2209 annotated = de_stringify_annotation(
2210 cls, raw_annotation, originating_cls.__module__
2211 )
2212 except NameError:
2213 # in most cases, at least within our own tests, we can raise
2214 # here, which is more accurate as it prevents us from returning
2215 # false negatives. However, in the real world, try to avoid getting
2216 # involved with end-user annotations that have nothing to do with us.
2217 # see issue #8888 where we bypass using this function in the case
2218 # that we want to detect an unresolvable Mapped[] type.
2219 return False
2220 else:
2221 return is_origin_of_cls(annotated, _MappedAnnotationBase)
2222
2223
2224class _CleanupError(Exception):
2225 pass
2226
2227
2228def _cleanup_mapped_str_annotation(
2229 annotation: str, originating_module: str
2230) -> str:
2231 # fix up an annotation that comes in as the form:
2232 # 'Mapped[List[Address]]' so that it instead looks like:
2233 # 'Mapped[List["Address"]]' , which will allow us to get
2234 # "Address" as a string
2235
2236 # additionally, resolve symbols for these names since this is where
2237 # we'd have to do it
2238
2239 inner: Optional[Match[str]]
2240
2241 mm = re.match(r"^(.+?)\[(.+)\]$", annotation)
2242
2243 if not mm:
2244 return annotation
2245
2246 # ticket #8759. Resolve the Mapped name to a real symbol.
2247 # originally this just checked the name.
2248 try:
2249 obj = eval_name_only(mm.group(1), originating_module)
2250 except NameError as ne:
2251 raise _CleanupError(
2252 f'For annotation "{annotation}", could not resolve '
2253 f'container type "{mm.group(1)}". '
2254 "Please ensure this type is imported at the module level "
2255 "outside of TYPE_CHECKING blocks"
2256 ) from ne
2257
2258 if obj is typing.ClassVar:
2259 real_symbol = "ClassVar"
2260 else:
2261 try:
2262 if issubclass(obj, _MappedAnnotationBase):
2263 real_symbol = obj.__name__
2264 else:
2265 return annotation
2266 except TypeError:
2267 # avoid isinstance(obj, type) check, just catch TypeError
2268 return annotation
2269
2270 # note: if one of the codepaths above didn't define real_symbol and
2271 # then didn't return, real_symbol raises UnboundLocalError
2272 # which is actually a NameError, and the calling routines don't
2273 # notice this since they are catching NameError anyway. Just in case
2274 # this is being modified in the future, something to be aware of.
2275
2276 stack = []
2277 inner = mm
2278 while True:
2279 stack.append(real_symbol if mm is inner else inner.group(1))
2280 g2 = inner.group(2)
2281 inner = re.match(r"^(.+?)\[(.+)\]$", g2)
2282 if inner is None:
2283 stack.append(g2)
2284 break
2285
2286 # stacks we want to rewrite, that is, quote the last entry which
2287 # we think is a relationship class name:
2288 #
2289 # ['Mapped', 'List', 'Address']
2290 # ['Mapped', 'A']
2291 #
2292 # stacks we dont want to rewrite, which are generally MappedColumn
2293 # use cases:
2294 #
2295 # ['Mapped', "'Optional[Dict[str, str]]'"]
2296 # ['Mapped', 'dict[str, str] | None']
2297
2298 if (
2299 # avoid already quoted symbols such as
2300 # ['Mapped', "'Optional[Dict[str, str]]'"]
2301 not re.match(r"""^["'].*["']$""", stack[-1])
2302 # avoid further generics like Dict[] such as
2303 # ['Mapped', 'dict[str, str] | None']
2304 and not re.match(r".*\[.*\]", stack[-1])
2305 ):
2306 stripchars = "\"' "
2307 stack[-1] = ", ".join(
2308 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
2309 )
2310
2311 annotation = "[".join(stack) + ("]" * (len(stack) - 1))
2312
2313 return annotation
2314
2315
2316def _extract_mapped_subtype(
2317 raw_annotation: Optional[_AnnotationScanType],
2318 cls: type,
2319 originating_module: str,
2320 key: str,
2321 attr_cls: Type[Any],
2322 required: bool,
2323 is_dataclass_field: bool,
2324 expect_mapped: bool = True,
2325 raiseerr: bool = True,
2326) -> Optional[Tuple[Union[type, str], Optional[type]]]:
2327 """given an annotation, figure out if it's ``Mapped[something]`` and if
2328 so, return the ``something`` part.
2329
2330 Includes error raise scenarios and other options.
2331
2332 """
2333
2334 if raw_annotation is None:
2335 if required:
2336 raise sa_exc.ArgumentError(
2337 f"Python typing annotation is required for attribute "
2338 f'"{cls.__name__}.{key}" when primary argument(s) for '
2339 f'"{attr_cls.__name__}" construct are None or not present'
2340 )
2341 return None
2342
2343 try:
2344 annotated = de_stringify_annotation(
2345 cls,
2346 raw_annotation,
2347 originating_module,
2348 str_cleanup_fn=_cleanup_mapped_str_annotation,
2349 )
2350 except _CleanupError as ce:
2351 raise sa_exc.ArgumentError(
2352 f"Could not interpret annotation {raw_annotation}. "
2353 "Check that it uses names that are correctly imported at the "
2354 "module level. See chained stack trace for more hints."
2355 ) from ce
2356 except NameError as ne:
2357 if raiseerr and "Mapped[" in raw_annotation: # type: ignore
2358 raise sa_exc.ArgumentError(
2359 f"Could not interpret annotation {raw_annotation}. "
2360 "Check that it uses names that are correctly imported at the "
2361 "module level. See chained stack trace for more hints."
2362 ) from ne
2363
2364 annotated = raw_annotation # type: ignore
2365
2366 if is_dataclass_field:
2367 return annotated, None
2368 else:
2369 if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
2370 annotated, _MappedAnnotationBase
2371 ):
2372 if expect_mapped:
2373 if not raiseerr:
2374 return None
2375
2376 origin = getattr(annotated, "__origin__", None)
2377 if origin is typing.ClassVar:
2378 return None
2379
2380 # check for other kind of ORM descriptor like AssociationProxy,
2381 # don't raise for that (issue #9957)
2382 elif isinstance(origin, type) and issubclass(
2383 origin, ORMDescriptor
2384 ):
2385 return None
2386
2387 raise sa_exc.ArgumentError(
2388 f'Type annotation for "{cls.__name__}.{key}" '
2389 "can't be correctly interpreted for "
2390 "Annotated Declarative Table form. ORM annotations "
2391 "should normally make use of the ``Mapped[]`` generic "
2392 "type, or other ORM-compatible generic type, as a "
2393 "container for the actual type, which indicates the "
2394 "intent that the attribute is mapped. "
2395 "Class variables that are not intended to be mapped "
2396 "by the ORM should use ClassVar[]. "
2397 "To allow Annotated Declarative to disregard legacy "
2398 "annotations which don't use Mapped[] to pass, set "
2399 '"__allow_unmapped__ = True" on the class or a '
2400 "superclass this class.",
2401 code="zlpr",
2402 )
2403
2404 else:
2405 return annotated, None
2406
2407 if len(annotated.__args__) != 1:
2408 raise sa_exc.ArgumentError(
2409 "Expected sub-type for Mapped[] annotation"
2410 )
2411
2412 return annotated.__args__[0], annotated.__origin__
2413
2414
2415def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
2416 if hasattr(prop, "_mapper_property_name"):
2417 name = prop._mapper_property_name()
2418 else:
2419 name = None
2420 return util.clsname_as_plain_name(prop, name)