1# orm/decl_base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Internal implementation for declarative."""
9
10from __future__ import annotations
11
12import collections
13import dataclasses
14import re
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import List
21from typing import Mapping
22from typing import NamedTuple
23from typing import NoReturn
24from typing import Optional
25from typing import Protocol
26from typing import Sequence
27from typing import Tuple
28from typing import Type
29from typing import TYPE_CHECKING
30from typing import TypeVar
31from typing import Union
32import weakref
33
34from . import attributes
35from . import clsregistry
36from . import exc as orm_exc
37from . import instrumentation
38from . import mapperlib
39from ._typing import _O
40from ._typing import attr_is_internal_proxy
41from .attributes import InstrumentedAttribute
42from .attributes import QueryableAttribute
43from .base import _is_mapped_class
44from .base import InspectionAttr
45from .descriptor_props import CompositeProperty
46from .descriptor_props import SynonymProperty
47from .interfaces import _AttributeOptions
48from .interfaces import _DataclassArguments
49from .interfaces import _DCAttributeOptions
50from .interfaces import _IntrospectsAnnotations
51from .interfaces import _MappedAttribute
52from .interfaces import _MapsColumns
53from .interfaces import MapperProperty
54from .mapper import Mapper
55from .properties import ColumnProperty
56from .properties import MappedColumn
57from .util import _extract_mapped_subtype
58from .util import _is_mapped_annotation
59from .util import class_mapper
60from .util import de_stringify_annotation
61from .. import event
62from .. import exc
63from .. import util
64from ..sql import expression
65from ..sql.base import _NoArg
66from ..sql.schema import Column
67from ..sql.schema import Table
68from ..util import topological
69from ..util.typing import _AnnotationScanType
70from ..util.typing import get_args
71from ..util.typing import is_fwd_ref
72from ..util.typing import is_literal
73
74if TYPE_CHECKING:
75 from ._typing import _ClassDict
76 from ._typing import _RegistryType
77 from .base import Mapped
78 from .decl_api import declared_attr
79 from .instrumentation import ClassManager
80 from ..sql.elements import NamedColumn
81 from ..sql.schema import MetaData
82 from ..sql.selectable import FromClause
83
84_T = TypeVar("_T", bound=Any)
85
86_MapperKwArgs = Mapping[str, Any]
87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
88
89
90class MappedClassProtocol(Protocol[_O]):
91 """A protocol representing a SQLAlchemy mapped class.
92
93 The protocol is generic on the type of class, use
94 ``MappedClassProtocol[Any]`` to allow any mapped class.
95 """
96
97 __name__: str
98 __mapper__: Mapper[_O]
99 __table__: FromClause
100
101 def __call__(self, **kw: Any) -> _O: ...
102
103
104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
105 "Internal more detailed version of ``MappedClassProtocol``."
106
107 metadata: MetaData
108 __tablename__: str
109 __mapper_args__: _MapperKwArgs
110 __table_args__: Optional[_TableArgsType]
111
112 _sa_apply_dc_transforms: Optional[_DataclassArguments]
113
114 def __declare_first__(self) -> None: ...
115
116 def __declare_last__(self) -> None: ...
117
118
119def _declared_mapping_info(
120 cls: Type[Any],
121) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
122 # deferred mapping
123 if _DeferredMapperConfig.has_cls(cls):
124 return _DeferredMapperConfig.config_for_cls(cls)
125 # regular mapping
126 elif _is_mapped_class(cls):
127 return class_mapper(cls, configure=False)
128 else:
129 return None
130
131
132def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
133 """return True if this class will be used as a superclass to set in
134 'inherits'.
135
136 This includes deferred mapper configs that aren't mapped yet, however does
137 not include classes with _sa_decl_prepare_nocascade (e.g.
138 ``AbstractConcreteBase``); these concrete-only classes are not set up as
139 "inherits" until after mappers are configured using
140 mapper._set_concrete_base()
141
142 """
143 if _DeferredMapperConfig.has_cls(cls):
144 return not _get_immediate_cls_attr(
145 cls, "_sa_decl_prepare_nocascade", strict=True
146 )
147 # regular mapping
148 elif _is_mapped_class(cls):
149 return True
150 else:
151 return False
152
153
154def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
155 if cls is object:
156 return None
157
158 sup: Optional[Type[Any]]
159
160 if cls.__dict__.get("__abstract__", False):
161 for base_ in cls.__bases__:
162 sup = _resolve_for_abstract_or_classical(base_)
163 if sup is not None:
164 return sup
165 else:
166 return None
167 else:
168 clsmanager = _dive_for_cls_manager(cls)
169
170 if clsmanager:
171 return clsmanager.class_
172 else:
173 return cls
174
175
176def _get_immediate_cls_attr(
177 cls: Type[Any], attrname: str, strict: bool = False
178) -> Optional[Any]:
179 """return an attribute of the class that is either present directly
180 on the class, e.g. not on a superclass, or is from a superclass but
181 this superclass is a non-mapped mixin, that is, not a descendant of
182 the declarative base and is also not classically mapped.
183
184 This is used to detect attributes that indicate something about
185 a mapped class independently from any mapped classes that it may
186 inherit from.
187
188 """
189
190 # the rules are different for this name than others,
191 # make sure we've moved it out. transitional
192 assert attrname != "__abstract__"
193
194 if not issubclass(cls, object):
195 return None
196
197 if attrname in cls.__dict__:
198 return getattr(cls, attrname)
199
200 for base in cls.__mro__[1:]:
201 _is_classical_inherits = _dive_for_cls_manager(base) is not None
202
203 if attrname in base.__dict__ and (
204 base is cls
205 or (
206 (base in cls.__bases__ if strict else True)
207 and not _is_classical_inherits
208 )
209 ):
210 return getattr(base, attrname)
211 else:
212 return None
213
214
215def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
216 # because the class manager registration is pluggable,
217 # we need to do the search for every class in the hierarchy,
218 # rather than just a simple "cls._sa_class_manager"
219
220 for base in cls.__mro__:
221 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
222 base
223 )
224 if manager:
225 return manager
226 return None
227
228
229def _as_declarative(
230 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
231) -> Optional[_MapperConfig]:
232 # declarative scans the class for attributes. no table or mapper
233 # args passed separately.
234 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
235
236
237def _mapper(
238 registry: _RegistryType,
239 cls: Type[_O],
240 table: Optional[FromClause],
241 mapper_kw: _MapperKwArgs,
242) -> Mapper[_O]:
243 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
244 return cast("MappedClassProtocol[_O]", cls).__mapper__
245
246
247@util.preload_module("sqlalchemy.orm.decl_api")
248def _is_declarative_props(obj: Any) -> bool:
249 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
250
251 return isinstance(obj, (_declared_attr_common, util.classproperty))
252
253
254def _check_declared_props_nocascade(
255 obj: Any, name: str, cls: Type[_O]
256) -> bool:
257 if _is_declarative_props(obj):
258 if getattr(obj, "_cascading", False):
259 util.warn(
260 "@declared_attr.cascading is not supported on the %s "
261 "attribute on class %s. This attribute invokes for "
262 "subclasses in any case." % (name, cls)
263 )
264 return True
265 else:
266 return False
267
268
269class _MapperConfig:
270 __slots__ = (
271 "cls",
272 "classname",
273 "properties",
274 "declared_attr_reg",
275 "__weakref__",
276 )
277
278 cls: Type[Any]
279 classname: str
280 properties: util.OrderedDict[
281 str,
282 Union[
283 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
284 ],
285 ]
286 declared_attr_reg: Dict[declared_attr[Any], Any]
287
288 @classmethod
289 def setup_mapping(
290 cls,
291 registry: _RegistryType,
292 cls_: Type[_O],
293 dict_: _ClassDict,
294 table: Optional[FromClause],
295 mapper_kw: _MapperKwArgs,
296 ) -> Optional[_MapperConfig]:
297 manager = attributes.opt_manager_of_class(cls)
298 if manager and manager.class_ is cls_:
299 raise exc.InvalidRequestError(
300 f"Class {cls!r} already has been instrumented declaratively"
301 )
302
303 if cls_.__dict__.get("__abstract__", False):
304 return None
305
306 defer_map = _get_immediate_cls_attr(
307 cls_, "_sa_decl_prepare_nocascade", strict=True
308 ) or hasattr(cls_, "_sa_decl_prepare")
309
310 if defer_map:
311 return _DeferredMapperConfig(
312 registry, cls_, dict_, table, mapper_kw
313 )
314 else:
315 return _ClassScanMapperConfig(
316 registry, cls_, dict_, table, mapper_kw
317 )
318
319 def __init__(
320 self,
321 registry: _RegistryType,
322 cls_: Type[Any],
323 mapper_kw: _MapperKwArgs,
324 ):
325 self.cls = util.assert_arg_type(cls_, type, "cls_")
326 self.classname = cls_.__name__
327 self.properties = util.OrderedDict()
328 self.declared_attr_reg = {}
329
330 instrumentation.register_class(
331 self.cls,
332 finalize=False,
333 registry=registry,
334 declarative_scan=self,
335 init_method=registry.constructor,
336 )
337
338 def set_cls_attribute(self, attrname: str, value: _T) -> _T:
339 manager = instrumentation.manager_of_class(self.cls)
340 manager.install_member(attrname, value)
341 return value
342
343 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
344 raise NotImplementedError()
345
346 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
347 self.map(mapper_kw)
348
349
350class _ImperativeMapperConfig(_MapperConfig):
351 __slots__ = ("local_table", "inherits")
352
353 def __init__(
354 self,
355 registry: _RegistryType,
356 cls_: Type[_O],
357 table: Optional[FromClause],
358 mapper_kw: _MapperKwArgs,
359 ):
360 super().__init__(registry, cls_, mapper_kw)
361
362 self.local_table = self.set_cls_attribute("__table__", table)
363
364 with mapperlib._CONFIGURE_MUTEX:
365 clsregistry._add_class(
366 self.classname, self.cls, registry._class_registry
367 )
368
369 self._setup_inheritance(mapper_kw)
370
371 self._early_mapping(mapper_kw)
372
373 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
374 mapper_cls = Mapper
375
376 return self.set_cls_attribute(
377 "__mapper__",
378 mapper_cls(self.cls, self.local_table, **mapper_kw),
379 )
380
381 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
382 cls = self.cls
383
384 inherits = mapper_kw.get("inherits", None)
385
386 if inherits is None:
387 # since we search for classical mappings now, search for
388 # multiple mapped bases as well and raise an error.
389 inherits_search = []
390 for base_ in cls.__bases__:
391 c = _resolve_for_abstract_or_classical(base_)
392 if c is None:
393 continue
394
395 if _is_supercls_for_inherits(c) and c not in inherits_search:
396 inherits_search.append(c)
397
398 if inherits_search:
399 if len(inherits_search) > 1:
400 raise exc.InvalidRequestError(
401 "Class %s has multiple mapped bases: %r"
402 % (cls, inherits_search)
403 )
404 inherits = inherits_search[0]
405 elif isinstance(inherits, Mapper):
406 inherits = inherits.class_
407
408 self.inherits = inherits
409
410
411class _CollectedAnnotation(NamedTuple):
412 raw_annotation: _AnnotationScanType
413 mapped_container: Optional[Type[Mapped[Any]]]
414 extracted_mapped_annotation: Union[_AnnotationScanType, str]
415 is_dataclass: bool
416 attr_value: Any
417 originating_module: str
418 originating_class: Type[Any]
419
420
421class _ClassScanMapperConfig(_MapperConfig):
422 __slots__ = (
423 "registry",
424 "clsdict_view",
425 "collected_attributes",
426 "collected_annotations",
427 "local_table",
428 "persist_selectable",
429 "declared_columns",
430 "column_ordering",
431 "column_copies",
432 "table_args",
433 "tablename",
434 "mapper_args",
435 "mapper_args_fn",
436 "table_fn",
437 "inherits",
438 "single",
439 "allow_dataclass_fields",
440 "dataclass_setup_arguments",
441 "is_dataclass_prior_to_mapping",
442 "allow_unmapped_annotations",
443 )
444
445 is_deferred = False
446 registry: _RegistryType
447 clsdict_view: _ClassDict
448 collected_annotations: Dict[str, _CollectedAnnotation]
449 collected_attributes: Dict[str, Any]
450 local_table: Optional[FromClause]
451 persist_selectable: Optional[FromClause]
452 declared_columns: util.OrderedSet[Column[Any]]
453 column_ordering: Dict[Column[Any], int]
454 column_copies: Dict[
455 Union[MappedColumn[Any], Column[Any]],
456 Union[MappedColumn[Any], Column[Any]],
457 ]
458 tablename: Optional[str]
459 mapper_args: Mapping[str, Any]
460 table_args: Optional[_TableArgsType]
461 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
462 inherits: Optional[Type[Any]]
463 single: bool
464
465 is_dataclass_prior_to_mapping: bool
466 allow_unmapped_annotations: bool
467
468 dataclass_setup_arguments: Optional[_DataclassArguments]
469 """if the class has SQLAlchemy native dataclass parameters, where
470 we will turn the class into a dataclass within the declarative mapping
471 process.
472
473 """
474
475 allow_dataclass_fields: bool
476 """if true, look for dataclass-processed Field objects on the target
477 class as well as superclasses and extract ORM mapping directives from
478 the "metadata" attribute of each Field.
479
480 if False, dataclass fields can still be used, however they won't be
481 mapped.
482
483 """
484
485 def __init__(
486 self,
487 registry: _RegistryType,
488 cls_: Type[_O],
489 dict_: _ClassDict,
490 table: Optional[FromClause],
491 mapper_kw: _MapperKwArgs,
492 ):
493 # grab class dict before the instrumentation manager has been added.
494 # reduces cycles
495 self.clsdict_view = (
496 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
497 )
498 super().__init__(registry, cls_, mapper_kw)
499 self.registry = registry
500 self.persist_selectable = None
501
502 self.collected_attributes = {}
503 self.collected_annotations = {}
504 self.declared_columns = util.OrderedSet()
505 self.column_ordering = {}
506 self.column_copies = {}
507 self.single = False
508 self.dataclass_setup_arguments = dca = getattr(
509 self.cls, "_sa_apply_dc_transforms", None
510 )
511
512 self.allow_unmapped_annotations = getattr(
513 self.cls, "__allow_unmapped__", False
514 ) or bool(self.dataclass_setup_arguments)
515
516 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
517 cls_
518 )
519
520 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
521
522 # we don't want to consume Field objects from a not-already-dataclass.
523 # the Field objects won't have their "name" or "type" populated,
524 # and while it seems like we could just set these on Field as we
525 # read them, Field is documented as "user read only" and we need to
526 # stay far away from any off-label use of dataclasses APIs.
527 if (not cld or dca) and sdk:
528 raise exc.InvalidRequestError(
529 "SQLAlchemy mapped dataclasses can't consume mapping "
530 "information from dataclass.Field() objects if the immediate "
531 "class is not already a dataclass."
532 )
533
534 # if already a dataclass, and __sa_dataclass_metadata_key__ present,
535 # then also look inside of dataclass.Field() objects yielded by
536 # dataclasses.get_fields(cls) when scanning for attributes
537 self.allow_dataclass_fields = bool(sdk and cld)
538
539 self._setup_declared_events()
540
541 self._scan_attributes()
542
543 self._setup_dataclasses_transforms()
544
545 with mapperlib._CONFIGURE_MUTEX:
546 clsregistry._add_class(
547 self.classname, self.cls, registry._class_registry
548 )
549
550 self._setup_inheriting_mapper(mapper_kw)
551
552 self._extract_mappable_attributes()
553
554 self._extract_declared_columns()
555
556 self._setup_table(table)
557
558 self._setup_inheriting_columns(mapper_kw)
559
560 self._early_mapping(mapper_kw)
561
562 def _setup_declared_events(self) -> None:
563 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
564
565 @event.listens_for(Mapper, "after_configured")
566 def after_configured() -> None:
567 cast(
568 "_DeclMappedClassProtocol[Any]", self.cls
569 ).__declare_last__()
570
571 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
572
573 @event.listens_for(Mapper, "before_configured")
574 def before_configured() -> None:
575 cast(
576 "_DeclMappedClassProtocol[Any]", self.cls
577 ).__declare_first__()
578
579 def _cls_attr_override_checker(
580 self, cls: Type[_O]
581 ) -> Callable[[str, Any], bool]:
582 """Produce a function that checks if a class has overridden an
583 attribute, taking SQLAlchemy-enabled dataclass fields into account.
584
585 """
586
587 if self.allow_dataclass_fields:
588 sa_dataclass_metadata_key = _get_immediate_cls_attr(
589 cls, "__sa_dataclass_metadata_key__"
590 )
591 else:
592 sa_dataclass_metadata_key = None
593
594 if not sa_dataclass_metadata_key:
595
596 def attribute_is_overridden(key: str, obj: Any) -> bool:
597 return getattr(cls, key, obj) is not obj
598
599 else:
600 all_datacls_fields = {
601 f.name: f.metadata[sa_dataclass_metadata_key]
602 for f in util.dataclass_fields(cls)
603 if sa_dataclass_metadata_key in f.metadata
604 }
605 local_datacls_fields = {
606 f.name: f.metadata[sa_dataclass_metadata_key]
607 for f in util.local_dataclass_fields(cls)
608 if sa_dataclass_metadata_key in f.metadata
609 }
610
611 absent = object()
612
613 def attribute_is_overridden(key: str, obj: Any) -> bool:
614 if _is_declarative_props(obj):
615 obj = obj.fget
616
617 # this function likely has some failure modes still if
618 # someone is doing a deep mixing of the same attribute
619 # name as plain Python attribute vs. dataclass field.
620
621 ret = local_datacls_fields.get(key, absent)
622 if _is_declarative_props(ret):
623 ret = ret.fget
624
625 if ret is obj:
626 return False
627 elif ret is not absent:
628 return True
629
630 all_field = all_datacls_fields.get(key, absent)
631
632 ret = getattr(cls, key, obj)
633
634 if ret is obj:
635 return False
636
637 # for dataclasses, this could be the
638 # 'default' of the field. so filter more specifically
639 # for an already-mapped InstrumentedAttribute
640 if ret is not absent and isinstance(
641 ret, InstrumentedAttribute
642 ):
643 return True
644
645 if all_field is obj:
646 return False
647 elif all_field is not absent:
648 return True
649
650 # can't find another attribute
651 return False
652
653 return attribute_is_overridden
654
655 _include_dunders = {
656 "__table__",
657 "__mapper_args__",
658 "__tablename__",
659 "__table_args__",
660 }
661
662 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
663
664 def _cls_attr_resolver(
665 self, cls: Type[Any]
666 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
667 """produce a function to iterate the "attributes" of a class
668 which we want to consider for mapping, adjusting for SQLAlchemy fields
669 embedded in dataclass fields.
670
671 """
672 cls_annotations = util.get_annotations(cls)
673
674 cls_vars = vars(cls)
675
676 _include_dunders = self._include_dunders
677 _match_exclude_dunders = self._match_exclude_dunders
678
679 names = [
680 n
681 for n in util.merge_lists_w_ordering(
682 list(cls_vars), list(cls_annotations)
683 )
684 if not _match_exclude_dunders.match(n) or n in _include_dunders
685 ]
686
687 if self.allow_dataclass_fields:
688 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
689 cls, "__sa_dataclass_metadata_key__"
690 )
691 else:
692 sa_dataclass_metadata_key = None
693
694 if not sa_dataclass_metadata_key:
695
696 def local_attributes_for_class() -> (
697 Iterable[Tuple[str, Any, Any, bool]]
698 ):
699 return (
700 (
701 name,
702 cls_vars.get(name),
703 cls_annotations.get(name),
704 False,
705 )
706 for name in names
707 )
708
709 else:
710 dataclass_fields = {
711 field.name: field for field in util.local_dataclass_fields(cls)
712 }
713
714 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
715
716 def local_attributes_for_class() -> (
717 Iterable[Tuple[str, Any, Any, bool]]
718 ):
719 for name in names:
720 field = dataclass_fields.get(name, None)
721 if field and sa_dataclass_metadata_key in field.metadata:
722 yield field.name, _as_dc_declaredattr(
723 field.metadata, fixed_sa_dataclass_metadata_key
724 ), cls_annotations.get(field.name), True
725 else:
726 yield name, cls_vars.get(name), cls_annotations.get(
727 name
728 ), False
729
730 return local_attributes_for_class
731
732 def _scan_attributes(self) -> None:
733 cls = self.cls
734
735 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
736
737 clsdict_view = self.clsdict_view
738 collected_attributes = self.collected_attributes
739 column_copies = self.column_copies
740 _include_dunders = self._include_dunders
741 mapper_args_fn = None
742 table_args = inherited_table_args = None
743 table_fn = None
744 tablename = None
745 fixed_table = "__table__" in clsdict_view
746
747 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
748
749 bases = []
750
751 for base in cls.__mro__:
752 # collect bases and make sure standalone columns are copied
753 # to be the column they will ultimately be on the class,
754 # so that declared_attr functions use the right columns.
755 # need to do this all the way up the hierarchy first
756 # (see #8190)
757
758 class_mapped = base is not cls and _is_supercls_for_inherits(base)
759
760 local_attributes_for_class = self._cls_attr_resolver(base)
761
762 if not class_mapped and base is not cls:
763 locally_collected_columns = self._produce_column_copies(
764 local_attributes_for_class,
765 attribute_is_overridden,
766 fixed_table,
767 base,
768 )
769 else:
770 locally_collected_columns = {}
771
772 bases.append(
773 (
774 base,
775 class_mapped,
776 local_attributes_for_class,
777 locally_collected_columns,
778 )
779 )
780
781 for (
782 base,
783 class_mapped,
784 local_attributes_for_class,
785 locally_collected_columns,
786 ) in bases:
787 # this transfer can also take place as we scan each name
788 # for finer-grained control of how collected_attributes is
789 # populated, as this is what impacts column ordering.
790 # however it's simpler to get it out of the way here.
791 collected_attributes.update(locally_collected_columns)
792
793 for (
794 name,
795 obj,
796 annotation,
797 is_dataclass_field,
798 ) in local_attributes_for_class():
799 if name in _include_dunders:
800 if name == "__mapper_args__":
801 check_decl = _check_declared_props_nocascade(
802 obj, name, cls
803 )
804 if not mapper_args_fn and (
805 not class_mapped or check_decl
806 ):
807 # don't even invoke __mapper_args__ until
808 # after we've determined everything about the
809 # mapped table.
810 # make a copy of it so a class-level dictionary
811 # is not overwritten when we update column-based
812 # arguments.
813 def _mapper_args_fn() -> Dict[str, Any]:
814 return dict(cls_as_Decl.__mapper_args__)
815
816 mapper_args_fn = _mapper_args_fn
817
818 elif name == "__tablename__":
819 check_decl = _check_declared_props_nocascade(
820 obj, name, cls
821 )
822 if not tablename and (not class_mapped or check_decl):
823 tablename = cls_as_Decl.__tablename__
824 elif name == "__table__":
825 check_decl = _check_declared_props_nocascade(
826 obj, name, cls
827 )
828 # if a @declared_attr using "__table__" is detected,
829 # wrap up a callable to look for "__table__" from
830 # the final concrete class when we set up a table.
831 # this was fixed by
832 # #11509, regression in 2.0 from version 1.4.
833 if check_decl and not table_fn:
834 # don't even invoke __table__ until we're ready
835 def _table_fn() -> FromClause:
836 return cls_as_Decl.__table__
837
838 table_fn = _table_fn
839
840 elif name == "__table_args__":
841 check_decl = _check_declared_props_nocascade(
842 obj, name, cls
843 )
844 if not table_args and (not class_mapped or check_decl):
845 table_args = cls_as_Decl.__table_args__
846 if not isinstance(
847 table_args, (tuple, dict, type(None))
848 ):
849 raise exc.ArgumentError(
850 "__table_args__ value must be a tuple, "
851 "dict, or None"
852 )
853 if base is not cls:
854 inherited_table_args = True
855 else:
856 # any other dunder names; should not be here
857 # as we have tested for all four names in
858 # _include_dunders
859 assert False
860 elif class_mapped:
861 if _is_declarative_props(obj) and not obj._quiet:
862 util.warn(
863 "Regular (i.e. not __special__) "
864 "attribute '%s.%s' uses @declared_attr, "
865 "but owning class %s is mapped - "
866 "not applying to subclass %s."
867 % (base.__name__, name, base, cls)
868 )
869
870 continue
871 elif base is not cls:
872 # we're a mixin, abstract base, or something that is
873 # acting like that for now.
874
875 if isinstance(obj, (Column, MappedColumn)):
876 # already copied columns to the mapped class.
877 continue
878 elif isinstance(obj, MapperProperty):
879 raise exc.InvalidRequestError(
880 "Mapper properties (i.e. deferred,"
881 "column_property(), relationship(), etc.) must "
882 "be declared as @declared_attr callables "
883 "on declarative mixin classes. For dataclass "
884 "field() objects, use a lambda:"
885 )
886 elif _is_declarative_props(obj):
887 # tried to get overloads to tell this to
888 # pylance, no luck
889 assert obj is not None
890
891 if obj._cascading:
892 if name in clsdict_view:
893 # unfortunately, while we can use the user-
894 # defined attribute here to allow a clean
895 # override, if there's another
896 # subclass below then it still tries to use
897 # this. not sure if there is enough
898 # information here to add this as a feature
899 # later on.
900 util.warn(
901 "Attribute '%s' on class %s cannot be "
902 "processed due to "
903 "@declared_attr.cascading; "
904 "skipping" % (name, cls)
905 )
906 collected_attributes[name] = column_copies[obj] = (
907 ret
908 ) = obj.__get__(obj, cls)
909 setattr(cls, name, ret)
910 else:
911 if is_dataclass_field:
912 # access attribute using normal class access
913 # first, to see if it's been mapped on a
914 # superclass. note if the dataclasses.field()
915 # has "default", this value can be anything.
916 ret = getattr(cls, name, None)
917
918 # so, if it's anything that's not ORM
919 # mapped, assume we should invoke the
920 # declared_attr
921 if not isinstance(ret, InspectionAttr):
922 ret = obj.fget()
923 else:
924 # access attribute using normal class access.
925 # if the declared attr already took place
926 # on a superclass that is mapped, then
927 # this is no longer a declared_attr, it will
928 # be the InstrumentedAttribute
929 ret = getattr(cls, name)
930
931 # correct for proxies created from hybrid_property
932 # or similar. note there is no known case that
933 # produces nested proxies, so we are only
934 # looking one level deep right now.
935
936 if (
937 isinstance(ret, InspectionAttr)
938 and attr_is_internal_proxy(ret)
939 and not isinstance(
940 ret.original_property, MapperProperty
941 )
942 ):
943 ret = ret.descriptor
944
945 collected_attributes[name] = column_copies[obj] = (
946 ret
947 )
948
949 if (
950 isinstance(ret, (Column, MapperProperty))
951 and ret.doc is None
952 ):
953 ret.doc = obj.__doc__
954
955 self._collect_annotation(
956 name,
957 obj._collect_return_annotation(),
958 base,
959 True,
960 obj,
961 )
962 elif _is_mapped_annotation(annotation, cls, base):
963 # Mapped annotation without any object.
964 # product_column_copies should have handled this.
965 # if future support for other MapperProperty,
966 # then test if this name is already handled and
967 # otherwise proceed to generate.
968 if not fixed_table:
969 assert (
970 name in collected_attributes
971 or attribute_is_overridden(name, None)
972 )
973 continue
974 else:
975 # here, the attribute is some other kind of
976 # property that we assume is not part of the
977 # declarative mapping. however, check for some
978 # more common mistakes
979 self._warn_for_decl_attributes(base, name, obj)
980 elif is_dataclass_field and (
981 name not in clsdict_view or clsdict_view[name] is not obj
982 ):
983 # here, we are definitely looking at the target class
984 # and not a superclass. this is currently a
985 # dataclass-only path. if the name is only
986 # a dataclass field and isn't in local cls.__dict__,
987 # put the object there.
988 # assert that the dataclass-enabled resolver agrees
989 # with what we are seeing
990
991 assert not attribute_is_overridden(name, obj)
992
993 if _is_declarative_props(obj):
994 obj = obj.fget()
995
996 collected_attributes[name] = obj
997 self._collect_annotation(
998 name, annotation, base, False, obj
999 )
1000 else:
1001 collected_annotation = self._collect_annotation(
1002 name, annotation, base, None, obj
1003 )
1004 is_mapped = (
1005 collected_annotation is not None
1006 and collected_annotation.mapped_container is not None
1007 )
1008 generated_obj = (
1009 collected_annotation.attr_value
1010 if collected_annotation is not None
1011 else obj
1012 )
1013 if obj is None and not fixed_table and is_mapped:
1014 collected_attributes[name] = (
1015 generated_obj
1016 if generated_obj is not None
1017 else MappedColumn()
1018 )
1019 elif name in clsdict_view:
1020 collected_attributes[name] = obj
1021 # else if the name is not in the cls.__dict__,
1022 # don't collect it as an attribute.
1023 # we will see the annotation only, which is meaningful
1024 # both for mapping and dataclasses setup
1025
1026 if inherited_table_args and not tablename:
1027 table_args = None
1028
1029 self.table_args = table_args
1030 self.tablename = tablename
1031 self.mapper_args_fn = mapper_args_fn
1032 self.table_fn = table_fn
1033
1034 def _setup_dataclasses_transforms(self) -> None:
1035 dataclass_setup_arguments = self.dataclass_setup_arguments
1036 if not dataclass_setup_arguments:
1037 return
1038
1039 # can't use is_dataclass since it uses hasattr
1040 if "__dataclass_fields__" in self.cls.__dict__:
1041 raise exc.InvalidRequestError(
1042 f"Class {self.cls} is already a dataclass; ensure that "
1043 "base classes / decorator styles of establishing dataclasses "
1044 "are not being mixed. "
1045 "This can happen if a class that inherits from "
1046 "'MappedAsDataclass', even indirectly, is been mapped with "
1047 "'@registry.mapped_as_dataclass'"
1048 )
1049
1050 # can't create a dataclass if __table__ is already there. This would
1051 # fail an assertion when calling _get_arguments_for_make_dataclass:
1052 # assert False, "Mapped[] received without a mapping declaration"
1053 if "__table__" in self.cls.__dict__:
1054 raise exc.InvalidRequestError(
1055 f"Class {self.cls} already defines a '__table__'. "
1056 "ORM Annotated Dataclasses do not support a pre-existing "
1057 "'__table__' element"
1058 )
1059
1060 warn_for_non_dc_attrs = collections.defaultdict(list)
1061
1062 def _allow_dataclass_field(
1063 key: str, originating_class: Type[Any]
1064 ) -> bool:
1065 if (
1066 originating_class is not self.cls
1067 and "__dataclass_fields__" not in originating_class.__dict__
1068 ):
1069 warn_for_non_dc_attrs[originating_class].append(key)
1070
1071 return True
1072
1073 manager = instrumentation.manager_of_class(self.cls)
1074 assert manager is not None
1075
1076 field_list = [
1077 _AttributeOptions._get_arguments_for_make_dataclass(
1078 self,
1079 key,
1080 anno,
1081 mapped_container,
1082 self.collected_attributes.get(key, _NoArg.NO_ARG),
1083 dataclass_setup_arguments,
1084 )
1085 for key, anno, mapped_container in (
1086 (
1087 key,
1088 mapped_anno if mapped_anno else raw_anno,
1089 mapped_container,
1090 )
1091 for key, (
1092 raw_anno,
1093 mapped_container,
1094 mapped_anno,
1095 is_dc,
1096 attr_value,
1097 originating_module,
1098 originating_class,
1099 ) in self.collected_annotations.items()
1100 if _allow_dataclass_field(key, originating_class)
1101 and (
1102 key not in self.collected_attributes
1103 # issue #9226; check for attributes that we've collected
1104 # which are already instrumented, which we would assume
1105 # mean we are in an ORM inheritance mapping and this
1106 # attribute is already mapped on the superclass. Under
1107 # no circumstance should any QueryableAttribute be sent to
1108 # the dataclass() function; anything that's mapped should
1109 # be Field and that's it
1110 or not isinstance(
1111 self.collected_attributes[key], QueryableAttribute
1112 )
1113 )
1114 )
1115 ]
1116 if warn_for_non_dc_attrs:
1117 for (
1118 originating_class,
1119 non_dc_attrs,
1120 ) in warn_for_non_dc_attrs.items():
1121 util.warn_deprecated(
1122 f"When transforming {self.cls} to a dataclass, "
1123 f"attribute(s) "
1124 f"{', '.join(repr(key) for key in non_dc_attrs)} "
1125 f"originates from superclass "
1126 f"{originating_class}, which is not a dataclass. This "
1127 f"usage is deprecated and will raise an error in "
1128 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
1129 f"Dataclasses, ensure that all mixin classes and other "
1130 f"superclasses which include attributes are also a "
1131 f"subclass of MappedAsDataclass.",
1132 "2.0",
1133 code="dcmx",
1134 )
1135
1136 annotations = {}
1137 defaults = {}
1138 for item in field_list:
1139 if len(item) == 2:
1140 name, tp = item
1141 elif len(item) == 3:
1142 name, tp, spec = item
1143 defaults[name] = spec
1144 else:
1145 assert False
1146 annotations[name] = tp
1147
1148 for k, v in defaults.items():
1149 setattr(self.cls, k, v)
1150
1151 self._apply_dataclasses_to_any_class(
1152 dataclass_setup_arguments, self.cls, annotations
1153 )
1154
1155 @classmethod
1156 def _update_annotations_for_non_mapped_class(
1157 cls, klass: Type[_O]
1158 ) -> Mapping[str, _AnnotationScanType]:
1159 cls_annotations = util.get_annotations(klass)
1160
1161 new_anno = {}
1162 for name, annotation in cls_annotations.items():
1163 if _is_mapped_annotation(annotation, klass, klass):
1164 extracted = _extract_mapped_subtype(
1165 annotation,
1166 klass,
1167 klass.__module__,
1168 name,
1169 type(None),
1170 required=False,
1171 is_dataclass_field=False,
1172 expect_mapped=False,
1173 )
1174 if extracted:
1175 inner, _ = extracted
1176 new_anno[name] = inner
1177 else:
1178 new_anno[name] = annotation
1179 return new_anno
1180
1181 @classmethod
1182 def _apply_dataclasses_to_any_class(
1183 cls,
1184 dataclass_setup_arguments: _DataclassArguments,
1185 klass: Type[_O],
1186 use_annotations: Mapping[str, _AnnotationScanType],
1187 ) -> None:
1188 cls._assert_dc_arguments(dataclass_setup_arguments)
1189
1190 dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
1191 if dataclass_callable is _NoArg.NO_ARG:
1192 dataclass_callable = dataclasses.dataclass
1193
1194 restored: Optional[Any]
1195
1196 if use_annotations:
1197 # apply constructed annotations that should look "normal" to a
1198 # dataclasses callable, based on the fields present. This
1199 # means remove the Mapped[] container and ensure all Field
1200 # entries have an annotation
1201 restored = getattr(klass, "__annotations__", None)
1202 klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
1203 else:
1204 restored = None
1205
1206 try:
1207 dataclass_callable( # type: ignore[call-overload]
1208 klass,
1209 **{ # type: ignore[call-overload,unused-ignore]
1210 k: v
1211 for k, v in dataclass_setup_arguments.items()
1212 if v is not _NoArg.NO_ARG
1213 and k not in ("dataclass_callable",)
1214 },
1215 )
1216 except (TypeError, ValueError) as ex:
1217 raise exc.InvalidRequestError(
1218 f"Python dataclasses error encountered when creating "
1219 f"dataclass for {klass.__name__!r}: "
1220 f"{ex!r}. Please refer to Python dataclasses "
1221 "documentation for additional information.",
1222 code="dcte",
1223 ) from ex
1224 finally:
1225 # restore original annotations outside of the dataclasses
1226 # process; for mixins and __abstract__ superclasses, SQLAlchemy
1227 # Declarative will need to see the Mapped[] container inside the
1228 # annotations in order to map subclasses
1229 if use_annotations:
1230 if restored is None:
1231 del klass.__annotations__
1232 else:
1233 klass.__annotations__ = restored
1234
1235 @classmethod
1236 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
1237 allowed = {
1238 "init",
1239 "repr",
1240 "order",
1241 "eq",
1242 "unsafe_hash",
1243 "kw_only",
1244 "match_args",
1245 "dataclass_callable",
1246 }
1247 disallowed_args = set(arguments).difference(allowed)
1248 if disallowed_args:
1249 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
1250 raise exc.ArgumentError(
1251 f"Dataclass argument(s) {msg} are not accepted"
1252 )
1253
1254 def _collect_annotation(
1255 self,
1256 name: str,
1257 raw_annotation: _AnnotationScanType,
1258 originating_class: Type[Any],
1259 expect_mapped: Optional[bool],
1260 attr_value: Any,
1261 ) -> Optional[_CollectedAnnotation]:
1262 if name in self.collected_annotations:
1263 return self.collected_annotations[name]
1264
1265 if raw_annotation is None:
1266 return None
1267
1268 is_dataclass = self.is_dataclass_prior_to_mapping
1269 allow_unmapped = self.allow_unmapped_annotations
1270
1271 if expect_mapped is None:
1272 is_dataclass_field = isinstance(attr_value, dataclasses.Field)
1273 expect_mapped = (
1274 not is_dataclass_field
1275 and not allow_unmapped
1276 and (
1277 attr_value is None
1278 or isinstance(attr_value, _MappedAttribute)
1279 )
1280 )
1281
1282 is_dataclass_field = False
1283 extracted = _extract_mapped_subtype(
1284 raw_annotation,
1285 self.cls,
1286 originating_class.__module__,
1287 name,
1288 type(attr_value),
1289 required=False,
1290 is_dataclass_field=is_dataclass_field,
1291 expect_mapped=expect_mapped and not is_dataclass,
1292 )
1293 if extracted is None:
1294 # ClassVar can come out here
1295 return None
1296
1297 extracted_mapped_annotation, mapped_container = extracted
1298
1299 if attr_value is None and not is_literal(extracted_mapped_annotation):
1300 for elem in get_args(extracted_mapped_annotation):
1301 if is_fwd_ref(
1302 elem, check_generic=True, check_for_plain_string=True
1303 ):
1304 elem = de_stringify_annotation(
1305 self.cls,
1306 elem,
1307 originating_class.__module__,
1308 include_generic=True,
1309 )
1310 # look in Annotated[...] for an ORM construct,
1311 # such as Annotated[int, mapped_column(primary_key=True)]
1312 if isinstance(elem, _IntrospectsAnnotations):
1313 attr_value = elem.found_in_pep593_annotated()
1314
1315 self.collected_annotations[name] = ca = _CollectedAnnotation(
1316 raw_annotation,
1317 mapped_container,
1318 extracted_mapped_annotation,
1319 is_dataclass,
1320 attr_value,
1321 originating_class.__module__,
1322 originating_class,
1323 )
1324 return ca
1325
1326 def _warn_for_decl_attributes(
1327 self, cls: Type[Any], key: str, c: Any
1328 ) -> None:
1329 if isinstance(c, expression.ColumnElement):
1330 util.warn(
1331 f"Attribute '{key}' on class {cls} appears to "
1332 "be a non-schema SQLAlchemy expression "
1333 "object; this won't be part of the declarative mapping. "
1334 "To map arbitrary expressions, use ``column_property()`` "
1335 "or a similar function such as ``deferred()``, "
1336 "``query_expression()`` etc. "
1337 )
1338
1339 def _produce_column_copies(
1340 self,
1341 attributes_for_class: Callable[
1342 [], Iterable[Tuple[str, Any, Any, bool]]
1343 ],
1344 attribute_is_overridden: Callable[[str, Any], bool],
1345 fixed_table: bool,
1346 originating_class: Type[Any],
1347 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
1348 cls = self.cls
1349 dict_ = self.clsdict_view
1350 locally_collected_attributes = {}
1351 column_copies = self.column_copies
1352 # copy mixin columns to the mapped class
1353
1354 for name, obj, annotation, is_dataclass in attributes_for_class():
1355 if (
1356 not fixed_table
1357 and obj is None
1358 and _is_mapped_annotation(annotation, cls, originating_class)
1359 ):
1360 # obj is None means this is the annotation only path
1361
1362 if attribute_is_overridden(name, obj):
1363 # perform same "overridden" check as we do for
1364 # Column/MappedColumn, this is how a mixin col is not
1365 # applied to an inherited subclass that does not have
1366 # the mixin. the anno-only path added here for
1367 # #9564
1368 continue
1369
1370 collected_annotation = self._collect_annotation(
1371 name, annotation, originating_class, True, obj
1372 )
1373 obj = (
1374 collected_annotation.attr_value
1375 if collected_annotation is not None
1376 else obj
1377 )
1378 if obj is None:
1379 obj = MappedColumn()
1380
1381 locally_collected_attributes[name] = obj
1382 setattr(cls, name, obj)
1383
1384 elif isinstance(obj, (Column, MappedColumn)):
1385 if attribute_is_overridden(name, obj):
1386 # if column has been overridden
1387 # (like by the InstrumentedAttribute of the
1388 # superclass), skip. don't collect the annotation
1389 # either (issue #8718)
1390 continue
1391
1392 collected_annotation = self._collect_annotation(
1393 name, annotation, originating_class, True, obj
1394 )
1395 obj = (
1396 collected_annotation.attr_value
1397 if collected_annotation is not None
1398 else obj
1399 )
1400
1401 if name not in dict_ and not (
1402 "__table__" in dict_
1403 and (getattr(obj, "name", None) or name)
1404 in dict_["__table__"].c
1405 ):
1406 if obj.foreign_keys:
1407 for fk in obj.foreign_keys:
1408 if (
1409 fk._table_column is not None
1410 and fk._table_column.table is None
1411 ):
1412 raise exc.InvalidRequestError(
1413 "Columns with foreign keys to "
1414 "non-table-bound "
1415 "columns must be declared as "
1416 "@declared_attr callables "
1417 "on declarative mixin classes. "
1418 "For dataclass "
1419 "field() objects, use a lambda:."
1420 )
1421
1422 column_copies[obj] = copy_ = obj._copy()
1423
1424 locally_collected_attributes[name] = copy_
1425 setattr(cls, name, copy_)
1426
1427 return locally_collected_attributes
1428
1429 def _extract_mappable_attributes(self) -> None:
1430 cls = self.cls
1431 collected_attributes = self.collected_attributes
1432
1433 our_stuff = self.properties
1434
1435 _include_dunders = self._include_dunders
1436
1437 late_mapped = _get_immediate_cls_attr(
1438 cls, "_sa_decl_prepare_nocascade", strict=True
1439 )
1440
1441 allow_unmapped_annotations = self.allow_unmapped_annotations
1442 expect_annotations_wo_mapped = (
1443 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
1444 )
1445
1446 look_for_dataclass_things = bool(self.dataclass_setup_arguments)
1447
1448 for k in list(collected_attributes):
1449 if k in _include_dunders:
1450 continue
1451
1452 value = collected_attributes[k]
1453
1454 if _is_declarative_props(value):
1455 # @declared_attr in collected_attributes only occurs here for a
1456 # @declared_attr that's directly on the mapped class;
1457 # for a mixin, these have already been evaluated
1458 if value._cascading:
1459 util.warn(
1460 "Use of @declared_attr.cascading only applies to "
1461 "Declarative 'mixin' and 'abstract' classes. "
1462 "Currently, this flag is ignored on mapped class "
1463 "%s" % self.cls
1464 )
1465
1466 value = getattr(cls, k)
1467
1468 elif (
1469 isinstance(value, QueryableAttribute)
1470 and value.class_ is not cls
1471 and value.key != k
1472 ):
1473 # detect a QueryableAttribute that's already mapped being
1474 # assigned elsewhere in userland, turn into a synonym()
1475 value = SynonymProperty(value.key)
1476 setattr(cls, k, value)
1477
1478 if (
1479 isinstance(value, tuple)
1480 and len(value) == 1
1481 and isinstance(value[0], (Column, _MappedAttribute))
1482 ):
1483 util.warn(
1484 "Ignoring declarative-like tuple value of attribute "
1485 "'%s': possibly a copy-and-paste error with a comma "
1486 "accidentally placed at the end of the line?" % k
1487 )
1488 continue
1489 elif look_for_dataclass_things and isinstance(
1490 value, dataclasses.Field
1491 ):
1492 # we collected a dataclass Field; dataclasses would have
1493 # set up the correct state on the class
1494 continue
1495 elif not isinstance(value, (Column, _DCAttributeOptions)):
1496 # using @declared_attr for some object that
1497 # isn't Column/MapperProperty/_DCAttributeOptions; remove
1498 # from the clsdict_view
1499 # and place the evaluated value onto the class.
1500 collected_attributes.pop(k)
1501 self._warn_for_decl_attributes(cls, k, value)
1502 if not late_mapped:
1503 setattr(cls, k, value)
1504 continue
1505 # we expect to see the name 'metadata' in some valid cases;
1506 # however at this point we see it's assigned to something trying
1507 # to be mapped, so raise for that.
1508 # TODO: should "registry" here be also? might be too late
1509 # to change that now (2.0 betas)
1510 elif k in ("metadata",):
1511 raise exc.InvalidRequestError(
1512 f"Attribute name '{k}' is reserved when using the "
1513 "Declarative API."
1514 )
1515 elif isinstance(value, Column):
1516 _undefer_column_name(
1517 k, self.column_copies.get(value, value) # type: ignore
1518 )
1519 else:
1520 if isinstance(value, _IntrospectsAnnotations):
1521 (
1522 annotation,
1523 mapped_container,
1524 extracted_mapped_annotation,
1525 is_dataclass,
1526 attr_value,
1527 originating_module,
1528 originating_class,
1529 ) = self.collected_annotations.get(
1530 k, (None, None, None, False, None, None, None)
1531 )
1532
1533 # issue #8692 - don't do any annotation interpretation if
1534 # an annotation were present and a container such as
1535 # Mapped[] etc. were not used. If annotation is None,
1536 # do declarative_scan so that the property can raise
1537 # for required
1538 if (
1539 mapped_container is not None
1540 or annotation is None
1541 # issue #10516: need to do declarative_scan even with
1542 # a non-Mapped annotation if we are doing
1543 # __allow_unmapped__, for things like col.name
1544 # assignment
1545 or allow_unmapped_annotations
1546 ):
1547 try:
1548 value.declarative_scan(
1549 self,
1550 self.registry,
1551 cls,
1552 originating_module,
1553 k,
1554 mapped_container,
1555 annotation,
1556 extracted_mapped_annotation,
1557 is_dataclass,
1558 )
1559 except NameError as ne:
1560 raise orm_exc.MappedAnnotationError(
1561 f"Could not resolve all types within mapped "
1562 f'annotation: "{annotation}". Ensure all '
1563 f"types are written correctly and are "
1564 f"imported within the module in use."
1565 ) from ne
1566 else:
1567 # assert that we were expecting annotations
1568 # without Mapped[] were going to be passed.
1569 # otherwise an error should have been raised
1570 # by util._extract_mapped_subtype before we got here.
1571 assert expect_annotations_wo_mapped
1572
1573 if isinstance(value, _DCAttributeOptions):
1574 if (
1575 value._has_dataclass_arguments
1576 and not look_for_dataclass_things
1577 ):
1578 if isinstance(value, MapperProperty):
1579 argnames = [
1580 "init",
1581 "default_factory",
1582 "repr",
1583 "default",
1584 "dataclass_metadata",
1585 ]
1586 else:
1587 argnames = [
1588 "init",
1589 "default_factory",
1590 "repr",
1591 "dataclass_metadata",
1592 ]
1593
1594 args = {
1595 a
1596 for a in argnames
1597 if getattr(
1598 value._attribute_options, f"dataclasses_{a}"
1599 )
1600 is not _NoArg.NO_ARG
1601 }
1602
1603 raise exc.ArgumentError(
1604 f"Attribute '{k}' on class {cls} includes "
1605 f"dataclasses argument(s): "
1606 f"{', '.join(sorted(repr(a) for a in args))} but "
1607 f"class does not specify "
1608 "SQLAlchemy native dataclass configuration."
1609 )
1610
1611 if not isinstance(value, (MapperProperty, _MapsColumns)):
1612 # filter for _DCAttributeOptions objects that aren't
1613 # MapperProperty / mapped_column(). Currently this
1614 # includes AssociationProxy. pop it from the things
1615 # we're going to map and set it up as a descriptor
1616 # on the class.
1617 collected_attributes.pop(k)
1618
1619 # Assoc Prox (or other descriptor object that may
1620 # use _DCAttributeOptions) is usually here, except if
1621 # 1. we're a
1622 # dataclass, dataclasses would have removed the
1623 # attr here or 2. assoc proxy is coming from a
1624 # superclass, we want it to be direct here so it
1625 # tracks state or 3. assoc prox comes from
1626 # declared_attr, uncommon case
1627 setattr(cls, k, value)
1628 continue
1629
1630 our_stuff[k] = value
1631
1632 def _extract_declared_columns(self) -> None:
1633 our_stuff = self.properties
1634
1635 # extract columns from the class dict
1636 declared_columns = self.declared_columns
1637 column_ordering = self.column_ordering
1638 name_to_prop_key = collections.defaultdict(set)
1639
1640 for key, c in list(our_stuff.items()):
1641 if isinstance(c, _MapsColumns):
1642 mp_to_assign = c.mapper_property_to_assign
1643 if mp_to_assign:
1644 our_stuff[key] = mp_to_assign
1645 else:
1646 # if no mapper property to assign, this currently means
1647 # this is a MappedColumn that will produce a Column for us
1648 del our_stuff[key]
1649
1650 for col, sort_order in c.columns_to_assign:
1651 if not isinstance(c, CompositeProperty):
1652 name_to_prop_key[col.name].add(key)
1653 declared_columns.add(col)
1654
1655 # we would assert this, however we want the below
1656 # warning to take effect instead. See #9630
1657 # assert col not in column_ordering
1658
1659 column_ordering[col] = sort_order
1660
1661 # if this is a MappedColumn and the attribute key we
1662 # have is not what the column has for its key, map the
1663 # Column explicitly under the attribute key name.
1664 # otherwise, Mapper will map it under the column key.
1665 if mp_to_assign is None and key != col.key:
1666 our_stuff[key] = col
1667 elif isinstance(c, Column):
1668 # undefer previously occurred here, and now occurs earlier.
1669 # ensure every column we get here has been named
1670 assert c.name is not None
1671 name_to_prop_key[c.name].add(key)
1672 declared_columns.add(c)
1673 # if the column is the same name as the key,
1674 # remove it from the explicit properties dict.
1675 # the normal rules for assigning column-based properties
1676 # will take over, including precedence of columns
1677 # in multi-column ColumnProperties.
1678 if key == c.key:
1679 del our_stuff[key]
1680
1681 for name, keys in name_to_prop_key.items():
1682 if len(keys) > 1:
1683 util.warn(
1684 "On class %r, Column object %r named "
1685 "directly multiple times, "
1686 "only one will be used: %s. "
1687 "Consider using orm.synonym instead"
1688 % (self.classname, name, (", ".join(sorted(keys))))
1689 )
1690
1691 def _setup_table(self, table: Optional[FromClause] = None) -> None:
1692 cls = self.cls
1693 cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
1694
1695 tablename = self.tablename
1696 table_args = self.table_args
1697 clsdict_view = self.clsdict_view
1698 declared_columns = self.declared_columns
1699 column_ordering = self.column_ordering
1700
1701 manager = attributes.manager_of_class(cls)
1702
1703 if (
1704 self.table_fn is None
1705 and "__table__" not in clsdict_view
1706 and table is None
1707 ):
1708 if hasattr(cls, "__table_cls__"):
1709 table_cls = cast(
1710 Type[Table],
1711 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
1712 )
1713 else:
1714 table_cls = Table
1715
1716 if tablename is not None:
1717 args: Tuple[Any, ...] = ()
1718 table_kw: Dict[str, Any] = {}
1719
1720 if table_args:
1721 if isinstance(table_args, dict):
1722 table_kw = table_args
1723 elif isinstance(table_args, tuple):
1724 if isinstance(table_args[-1], dict):
1725 args, table_kw = table_args[0:-1], table_args[-1]
1726 else:
1727 args = table_args
1728
1729 autoload_with = clsdict_view.get("__autoload_with__")
1730 if autoload_with:
1731 table_kw["autoload_with"] = autoload_with
1732
1733 autoload = clsdict_view.get("__autoload__")
1734 if autoload:
1735 table_kw["autoload"] = True
1736
1737 sorted_columns = sorted(
1738 declared_columns,
1739 key=lambda c: column_ordering.get(c, 0),
1740 )
1741 table = self.set_cls_attribute(
1742 "__table__",
1743 table_cls(
1744 tablename,
1745 self._metadata_for_cls(manager),
1746 *sorted_columns,
1747 *args,
1748 **table_kw,
1749 ),
1750 )
1751 else:
1752 if table is None:
1753 if self.table_fn:
1754 table = self.set_cls_attribute(
1755 "__table__", self.table_fn()
1756 )
1757 else:
1758 table = cls_as_Decl.__table__
1759 if declared_columns:
1760 for c in declared_columns:
1761 if not table.c.contains_column(c):
1762 raise exc.ArgumentError(
1763 "Can't add additional column %r when "
1764 "specifying __table__" % c.key
1765 )
1766
1767 self.local_table = table
1768
1769 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
1770 meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
1771 if meta is not None:
1772 return meta
1773 else:
1774 return manager.registry.metadata
1775
1776 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
1777 cls = self.cls
1778
1779 inherits = mapper_kw.get("inherits", None)
1780
1781 if inherits is None:
1782 # since we search for classical mappings now, search for
1783 # multiple mapped bases as well and raise an error.
1784 inherits_search = []
1785 for base_ in cls.__bases__:
1786 c = _resolve_for_abstract_or_classical(base_)
1787 if c is None:
1788 continue
1789
1790 if _is_supercls_for_inherits(c) and c not in inherits_search:
1791 inherits_search.append(c)
1792
1793 if inherits_search:
1794 if len(inherits_search) > 1:
1795 raise exc.InvalidRequestError(
1796 "Class %s has multiple mapped bases: %r"
1797 % (cls, inherits_search)
1798 )
1799 inherits = inherits_search[0]
1800 elif isinstance(inherits, Mapper):
1801 inherits = inherits.class_
1802
1803 self.inherits = inherits
1804
1805 clsdict_view = self.clsdict_view
1806 if "__table__" not in clsdict_view and self.tablename is None:
1807 self.single = True
1808
1809 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
1810 table = self.local_table
1811 cls = self.cls
1812 table_args = self.table_args
1813 declared_columns = self.declared_columns
1814
1815 if (
1816 table is None
1817 and self.inherits is None
1818 and not _get_immediate_cls_attr(cls, "__no_table__")
1819 ):
1820 raise exc.InvalidRequestError(
1821 "Class %r does not have a __table__ or __tablename__ "
1822 "specified and does not inherit from an existing "
1823 "table-mapped class." % cls
1824 )
1825 elif self.inherits:
1826 inherited_mapper_or_config = _declared_mapping_info(self.inherits)
1827 assert inherited_mapper_or_config is not None
1828 inherited_table = inherited_mapper_or_config.local_table
1829 inherited_persist_selectable = (
1830 inherited_mapper_or_config.persist_selectable
1831 )
1832
1833 if table is None:
1834 # single table inheritance.
1835 # ensure no table args
1836 if table_args:
1837 raise exc.ArgumentError(
1838 "Can't place __table_args__ on an inherited class "
1839 "with no table."
1840 )
1841
1842 # add any columns declared here to the inherited table.
1843 if declared_columns and not isinstance(inherited_table, Table):
1844 raise exc.ArgumentError(
1845 f"Can't declare columns on single-table-inherited "
1846 f"subclass {self.cls}; superclass {self.inherits} "
1847 "is not mapped to a Table"
1848 )
1849
1850 for col in declared_columns:
1851 assert inherited_table is not None
1852 if col.name in inherited_table.c:
1853 if inherited_table.c[col.name] is col:
1854 continue
1855 raise exc.ArgumentError(
1856 f"Column '{col}' on class {cls.__name__} "
1857 f"conflicts with existing column "
1858 f"'{inherited_table.c[col.name]}'. If using "
1859 f"Declarative, consider using the "
1860 "use_existing_column parameter of mapped_column() "
1861 "to resolve conflicts."
1862 )
1863 if col.primary_key:
1864 raise exc.ArgumentError(
1865 "Can't place primary key columns on an inherited "
1866 "class with no table."
1867 )
1868
1869 if TYPE_CHECKING:
1870 assert isinstance(inherited_table, Table)
1871
1872 inherited_table.append_column(col)
1873 if (
1874 inherited_persist_selectable is not None
1875 and inherited_persist_selectable is not inherited_table
1876 ):
1877 inherited_persist_selectable._refresh_for_new_column(
1878 col
1879 )
1880
1881 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
1882 properties = self.properties
1883
1884 if self.mapper_args_fn:
1885 mapper_args = self.mapper_args_fn()
1886 else:
1887 mapper_args = {}
1888
1889 if mapper_kw:
1890 mapper_args.update(mapper_kw)
1891
1892 if "properties" in mapper_args:
1893 properties = dict(properties)
1894 properties.update(mapper_args["properties"])
1895
1896 # make sure that column copies are used rather
1897 # than the original columns from any mixins
1898 for k in ("version_id_col", "polymorphic_on"):
1899 if k in mapper_args:
1900 v = mapper_args[k]
1901 mapper_args[k] = self.column_copies.get(v, v)
1902
1903 if "primary_key" in mapper_args:
1904 mapper_args["primary_key"] = [
1905 self.column_copies.get(v, v)
1906 for v in util.to_list(mapper_args["primary_key"])
1907 ]
1908
1909 if "inherits" in mapper_args:
1910 inherits_arg = mapper_args["inherits"]
1911 if isinstance(inherits_arg, Mapper):
1912 inherits_arg = inherits_arg.class_
1913
1914 if inherits_arg is not self.inherits:
1915 raise exc.InvalidRequestError(
1916 "mapper inherits argument given for non-inheriting "
1917 "class %s" % (mapper_args["inherits"])
1918 )
1919
1920 if self.inherits:
1921 mapper_args["inherits"] = self.inherits
1922
1923 if self.inherits and not mapper_args.get("concrete", False):
1924 # note the superclass is expected to have a Mapper assigned and
1925 # not be a deferred config, as this is called within map()
1926 inherited_mapper = class_mapper(self.inherits, False)
1927 inherited_table = inherited_mapper.local_table
1928
1929 # single or joined inheritance
1930 # exclude any cols on the inherited table which are
1931 # not mapped on the parent class, to avoid
1932 # mapping columns specific to sibling/nephew classes
1933 if "exclude_properties" not in mapper_args:
1934 mapper_args["exclude_properties"] = exclude_properties = {
1935 c.key
1936 for c in inherited_table.c
1937 if c not in inherited_mapper._columntoproperty
1938 }.union(inherited_mapper.exclude_properties or ())
1939 exclude_properties.difference_update(
1940 [c.key for c in self.declared_columns]
1941 )
1942
1943 # look through columns in the current mapper that
1944 # are keyed to a propname different than the colname
1945 # (if names were the same, we'd have popped it out above,
1946 # in which case the mapper makes this combination).
1947 # See if the superclass has a similar column property.
1948 # If so, join them together.
1949 for k, col in list(properties.items()):
1950 if not isinstance(col, expression.ColumnElement):
1951 continue
1952 if k in inherited_mapper._props:
1953 p = inherited_mapper._props[k]
1954 if isinstance(p, ColumnProperty):
1955 # note here we place the subclass column
1956 # first. See [ticket:1892] for background.
1957 properties[k] = [col] + p.columns
1958 result_mapper_args = mapper_args.copy()
1959 result_mapper_args["properties"] = properties
1960 self.mapper_args = result_mapper_args
1961
1962 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
1963 self._prepare_mapper_arguments(mapper_kw)
1964 if hasattr(self.cls, "__mapper_cls__"):
1965 mapper_cls = cast(
1966 "Type[Mapper[Any]]",
1967 util.unbound_method_to_callable(
1968 self.cls.__mapper_cls__ # type: ignore
1969 ),
1970 )
1971 else:
1972 mapper_cls = Mapper
1973
1974 return self.set_cls_attribute(
1975 "__mapper__",
1976 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1977 )
1978
1979
1980@util.preload_module("sqlalchemy.orm.decl_api")
1981def _as_dc_declaredattr(
1982 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
1983) -> Any:
1984 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1985 # we can't write it because field.metadata is immutable :( so we have
1986 # to go through extra trouble to compare these
1987 decl_api = util.preloaded.orm_decl_api
1988 obj = field_metadata[sa_dataclass_metadata_key]
1989 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
1990 return decl_api.declared_attr(obj)
1991 else:
1992 return obj
1993
1994
1995class _DeferredMapperConfig(_ClassScanMapperConfig):
1996 _cls: weakref.ref[Type[Any]]
1997
1998 is_deferred = True
1999
2000 _configs: util.OrderedDict[
2001 weakref.ref[Type[Any]], _DeferredMapperConfig
2002 ] = util.OrderedDict()
2003
2004 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
2005 pass
2006
2007 @property
2008 def cls(self) -> Type[Any]:
2009 return self._cls() # type: ignore
2010
2011 @cls.setter
2012 def cls(self, class_: Type[Any]) -> None:
2013 self._cls = weakref.ref(class_, self._remove_config_cls)
2014 self._configs[self._cls] = self
2015
2016 @classmethod
2017 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
2018 cls._configs.pop(ref, None)
2019
2020 @classmethod
2021 def has_cls(cls, class_: Type[Any]) -> bool:
2022 # 2.6 fails on weakref if class_ is an old style class
2023 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
2024
2025 @classmethod
2026 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
2027 if hasattr(class_, "_sa_raise_deferred_config"):
2028 class_._sa_raise_deferred_config()
2029
2030 raise orm_exc.UnmappedClassError(
2031 class_,
2032 msg=(
2033 f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
2034 "mapping on it. It is not yet usable as a mapped class."
2035 ),
2036 )
2037
2038 @classmethod
2039 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
2040 return cls._configs[weakref.ref(class_)]
2041
2042 @classmethod
2043 def classes_for_base(
2044 cls, base_cls: Type[Any], sort: bool = True
2045 ) -> List[_DeferredMapperConfig]:
2046 classes_for_base = [
2047 m
2048 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
2049 if cls_ is not None and issubclass(cls_, base_cls)
2050 ]
2051
2052 if not sort:
2053 return classes_for_base
2054
2055 all_m_by_cls = {m.cls: m for m in classes_for_base}
2056
2057 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
2058 for m_cls in all_m_by_cls:
2059 tuples.extend(
2060 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
2061 for base_cls in m_cls.__bases__
2062 if base_cls in all_m_by_cls
2063 )
2064 return list(topological.sort(tuples, classes_for_base))
2065
2066 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
2067 self._configs.pop(self._cls, None)
2068 return super().map(mapper_kw)
2069
2070
2071def _add_attribute(
2072 cls: Type[Any], key: str, value: MapperProperty[Any]
2073) -> None:
2074 """add an attribute to an existing declarative class.
2075
2076 This runs through the logic to determine MapperProperty,
2077 adds it to the Mapper, adds a column to the mapped Table, etc.
2078
2079 """
2080
2081 if "__mapper__" in cls.__dict__:
2082 mapped_cls = cast("MappedClassProtocol[Any]", cls)
2083
2084 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
2085 if isinstance(mc.__table__, Table):
2086 return mc.__table__
2087 raise exc.InvalidRequestError(
2088 f"Cannot add a new attribute to mapped class {mc.__name__!r} "
2089 "because it's not mapped against a table."
2090 )
2091
2092 if isinstance(value, Column):
2093 _undefer_column_name(key, value)
2094 _table_or_raise(mapped_cls).append_column(
2095 value, replace_existing=True
2096 )
2097 mapped_cls.__mapper__.add_property(key, value)
2098 elif isinstance(value, _MapsColumns):
2099 mp = value.mapper_property_to_assign
2100 for col, _ in value.columns_to_assign:
2101 _undefer_column_name(key, col)
2102 _table_or_raise(mapped_cls).append_column(
2103 col, replace_existing=True
2104 )
2105 if not mp:
2106 mapped_cls.__mapper__.add_property(key, col)
2107 if mp:
2108 mapped_cls.__mapper__.add_property(key, mp)
2109 elif isinstance(value, MapperProperty):
2110 mapped_cls.__mapper__.add_property(key, value)
2111 elif isinstance(value, QueryableAttribute) and value.key != key:
2112 # detect a QueryableAttribute that's already mapped being
2113 # assigned elsewhere in userland, turn into a synonym()
2114 value = SynonymProperty(value.key)
2115 mapped_cls.__mapper__.add_property(key, value)
2116 else:
2117 type.__setattr__(cls, key, value)
2118 mapped_cls.__mapper__._expire_memoizations()
2119 else:
2120 type.__setattr__(cls, key, value)
2121
2122
2123def _del_attribute(cls: Type[Any], key: str) -> None:
2124 if (
2125 "__mapper__" in cls.__dict__
2126 and key in cls.__dict__
2127 and not cast(
2128 "MappedClassProtocol[Any]", cls
2129 ).__mapper__._dispose_called
2130 ):
2131 value = cls.__dict__[key]
2132 if isinstance(
2133 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
2134 ):
2135 raise NotImplementedError(
2136 "Can't un-map individual mapped attributes on a mapped class."
2137 )
2138 else:
2139 type.__delattr__(cls, key)
2140 cast(
2141 "MappedClassProtocol[Any]", cls
2142 ).__mapper__._expire_memoizations()
2143 else:
2144 type.__delattr__(cls, key)
2145
2146
2147def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2148 """A simple constructor that allows initialization from kwargs.
2149
2150 Sets attributes on the constructed instance using the names and
2151 values in ``kwargs``.
2152
2153 Only keys that are present as
2154 attributes of the instance's class are allowed. These could be,
2155 for example, any mapped columns or relationships.
2156 """
2157 cls_ = type(self)
2158 for k in kwargs:
2159 if not hasattr(cls_, k):
2160 raise TypeError(
2161 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2162 )
2163 setattr(self, k, kwargs[k])
2164
2165
2166_declarative_constructor.__name__ = "__init__"
2167
2168
2169def _undefer_column_name(key: str, column: Column[Any]) -> None:
2170 if column.key is None:
2171 column.key = key
2172 if column.name is None:
2173 column.name = key