1# orm/decl_base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Internal implementation for declarative."""
9
10from __future__ import annotations
11
12import collections
13import dataclasses
14import re
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import List
21from typing import Mapping
22from typing import NamedTuple
23from typing import NoReturn
24from typing import Optional
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31import weakref
32
33from . import attributes
34from . import clsregistry
35from . import exc as orm_exc
36from . import instrumentation
37from . import mapperlib
38from ._typing import _O
39from ._typing import attr_is_internal_proxy
40from .attributes import InstrumentedAttribute
41from .attributes import QueryableAttribute
42from .base import _is_mapped_class
43from .base import InspectionAttr
44from .descriptor_props import CompositeProperty
45from .descriptor_props import SynonymProperty
46from .interfaces import _AttributeOptions
47from .interfaces import _DCAttributeOptions
48from .interfaces import _IntrospectsAnnotations
49from .interfaces import _MappedAttribute
50from .interfaces import _MapsColumns
51from .interfaces import MapperProperty
52from .mapper import Mapper
53from .properties import ColumnProperty
54from .properties import MappedColumn
55from .util import _extract_mapped_subtype
56from .util import _is_mapped_annotation
57from .util import class_mapper
58from .util import de_stringify_annotation
59from .. import event
60from .. import exc
61from .. import util
62from ..sql import expression
63from ..sql.base import _NoArg
64from ..sql.schema import Column
65from ..sql.schema import Table
66from ..util import topological
67from ..util.typing import _AnnotationScanType
68from ..util.typing import get_args
69from ..util.typing import is_fwd_ref
70from ..util.typing import is_literal
71from ..util.typing import Protocol
72from ..util.typing import TypedDict
73
74if TYPE_CHECKING:
75 from ._typing import _ClassDict
76 from ._typing import _RegistryType
77 from .base import Mapped
78 from .decl_api import declared_attr
79 from .instrumentation import ClassManager
80 from ..sql.elements import NamedColumn
81 from ..sql.schema import MetaData
82 from ..sql.selectable import FromClause
83
84_T = TypeVar("_T", bound=Any)
85
86_MapperKwArgs = Mapping[str, Any]
87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
88
89
90class MappedClassProtocol(Protocol[_O]):
91 """A protocol representing a SQLAlchemy mapped class.
92
93 The protocol is generic on the type of class, use
94 ``MappedClassProtocol[Any]`` to allow any mapped class.
95 """
96
97 __name__: str
98 __mapper__: Mapper[_O]
99 __table__: FromClause
100
101 def __call__(self, **kw: Any) -> _O: ...
102
103
104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
105 "Internal more detailed version of ``MappedClassProtocol``."
106
107 metadata: MetaData
108 __tablename__: str
109 __mapper_args__: _MapperKwArgs
110 __table_args__: Optional[_TableArgsType]
111
112 _sa_apply_dc_transforms: Optional[_DataclassArguments]
113
114 def __declare_first__(self) -> None: ...
115
116 def __declare_last__(self) -> None: ...
117
118
119class _DataclassArguments(TypedDict):
120 init: Union[_NoArg, bool]
121 repr: Union[_NoArg, bool]
122 eq: Union[_NoArg, bool]
123 order: Union[_NoArg, bool]
124 unsafe_hash: Union[_NoArg, bool]
125 match_args: Union[_NoArg, bool]
126 kw_only: Union[_NoArg, bool]
127 dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]]
128
129
130def _declared_mapping_info(
131 cls: Type[Any],
132) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
133 # deferred mapping
134 if _DeferredMapperConfig.has_cls(cls):
135 return _DeferredMapperConfig.config_for_cls(cls)
136 # regular mapping
137 elif _is_mapped_class(cls):
138 return class_mapper(cls, configure=False)
139 else:
140 return None
141
142
143def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
144 """return True if this class will be used as a superclass to set in
145 'inherits'.
146
147 This includes deferred mapper configs that aren't mapped yet, however does
148 not include classes with _sa_decl_prepare_nocascade (e.g.
149 ``AbstractConcreteBase``); these concrete-only classes are not set up as
150 "inherits" until after mappers are configured using
151 mapper._set_concrete_base()
152
153 """
154 if _DeferredMapperConfig.has_cls(cls):
155 return not _get_immediate_cls_attr(
156 cls, "_sa_decl_prepare_nocascade", strict=True
157 )
158 # regular mapping
159 elif _is_mapped_class(cls):
160 return True
161 else:
162 return False
163
164
165def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
166 if cls is object:
167 return None
168
169 sup: Optional[Type[Any]]
170
171 if cls.__dict__.get("__abstract__", False):
172 for base_ in cls.__bases__:
173 sup = _resolve_for_abstract_or_classical(base_)
174 if sup is not None:
175 return sup
176 else:
177 return None
178 else:
179 clsmanager = _dive_for_cls_manager(cls)
180
181 if clsmanager:
182 return clsmanager.class_
183 else:
184 return cls
185
186
187def _get_immediate_cls_attr(
188 cls: Type[Any], attrname: str, strict: bool = False
189) -> Optional[Any]:
190 """return an attribute of the class that is either present directly
191 on the class, e.g. not on a superclass, or is from a superclass but
192 this superclass is a non-mapped mixin, that is, not a descendant of
193 the declarative base and is also not classically mapped.
194
195 This is used to detect attributes that indicate something about
196 a mapped class independently from any mapped classes that it may
197 inherit from.
198
199 """
200
201 # the rules are different for this name than others,
202 # make sure we've moved it out. transitional
203 assert attrname != "__abstract__"
204
205 if not issubclass(cls, object):
206 return None
207
208 if attrname in cls.__dict__:
209 return getattr(cls, attrname)
210
211 for base in cls.__mro__[1:]:
212 _is_classical_inherits = _dive_for_cls_manager(base) is not None
213
214 if attrname in base.__dict__ and (
215 base is cls
216 or (
217 (base in cls.__bases__ if strict else True)
218 and not _is_classical_inherits
219 )
220 ):
221 return getattr(base, attrname)
222 else:
223 return None
224
225
226def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
227 # because the class manager registration is pluggable,
228 # we need to do the search for every class in the hierarchy,
229 # rather than just a simple "cls._sa_class_manager"
230
231 for base in cls.__mro__:
232 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
233 base
234 )
235 if manager:
236 return manager
237 return None
238
239
240def _as_declarative(
241 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
242) -> Optional[_MapperConfig]:
243 # declarative scans the class for attributes. no table or mapper
244 # args passed separately.
245 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
246
247
248def _mapper(
249 registry: _RegistryType,
250 cls: Type[_O],
251 table: Optional[FromClause],
252 mapper_kw: _MapperKwArgs,
253) -> Mapper[_O]:
254 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
255 return cast("MappedClassProtocol[_O]", cls).__mapper__
256
257
258@util.preload_module("sqlalchemy.orm.decl_api")
259def _is_declarative_props(obj: Any) -> bool:
260 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
261
262 return isinstance(obj, (_declared_attr_common, util.classproperty))
263
264
265def _check_declared_props_nocascade(
266 obj: Any, name: str, cls: Type[_O]
267) -> bool:
268 if _is_declarative_props(obj):
269 if getattr(obj, "_cascading", False):
270 util.warn(
271 "@declared_attr.cascading is not supported on the %s "
272 "attribute on class %s. This attribute invokes for "
273 "subclasses in any case." % (name, cls)
274 )
275 return True
276 else:
277 return False
278
279
280class _MapperConfig:
281 __slots__ = (
282 "cls",
283 "classname",
284 "properties",
285 "declared_attr_reg",
286 "__weakref__",
287 )
288
289 cls: Type[Any]
290 classname: str
291 properties: util.OrderedDict[
292 str,
293 Union[
294 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
295 ],
296 ]
297 declared_attr_reg: Dict[declared_attr[Any], Any]
298
299 @classmethod
300 def setup_mapping(
301 cls,
302 registry: _RegistryType,
303 cls_: Type[_O],
304 dict_: _ClassDict,
305 table: Optional[FromClause],
306 mapper_kw: _MapperKwArgs,
307 ) -> Optional[_MapperConfig]:
308 manager = attributes.opt_manager_of_class(cls)
309 if manager and manager.class_ is cls_:
310 raise exc.InvalidRequestError(
311 f"Class {cls!r} already has been instrumented declaratively"
312 )
313
314 if cls_.__dict__.get("__abstract__", False):
315 return None
316
317 defer_map = _get_immediate_cls_attr(
318 cls_, "_sa_decl_prepare_nocascade", strict=True
319 ) or hasattr(cls_, "_sa_decl_prepare")
320
321 if defer_map:
322 return _DeferredMapperConfig(
323 registry, cls_, dict_, table, mapper_kw
324 )
325 else:
326 return _ClassScanMapperConfig(
327 registry, cls_, dict_, table, mapper_kw
328 )
329
330 def __init__(
331 self,
332 registry: _RegistryType,
333 cls_: Type[Any],
334 mapper_kw: _MapperKwArgs,
335 ):
336 self.cls = util.assert_arg_type(cls_, type, "cls_")
337 self.classname = cls_.__name__
338 self.properties = util.OrderedDict()
339 self.declared_attr_reg = {}
340
341 if not mapper_kw.get("non_primary", False):
342 instrumentation.register_class(
343 self.cls,
344 finalize=False,
345 registry=registry,
346 declarative_scan=self,
347 init_method=registry.constructor,
348 )
349 else:
350 manager = attributes.opt_manager_of_class(self.cls)
351 if not manager or not manager.is_mapped:
352 raise exc.InvalidRequestError(
353 "Class %s has no primary mapper configured. Configure "
354 "a primary mapper first before setting up a non primary "
355 "Mapper." % self.cls
356 )
357
358 def set_cls_attribute(self, attrname: str, value: _T) -> _T:
359 manager = instrumentation.manager_of_class(self.cls)
360 manager.install_member(attrname, value)
361 return value
362
363 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
364 raise NotImplementedError()
365
366 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
367 self.map(mapper_kw)
368
369
370class _ImperativeMapperConfig(_MapperConfig):
371 __slots__ = ("local_table", "inherits")
372
373 def __init__(
374 self,
375 registry: _RegistryType,
376 cls_: Type[_O],
377 table: Optional[FromClause],
378 mapper_kw: _MapperKwArgs,
379 ):
380 super().__init__(registry, cls_, mapper_kw)
381
382 self.local_table = self.set_cls_attribute("__table__", table)
383
384 with mapperlib._CONFIGURE_MUTEX:
385 if not mapper_kw.get("non_primary", False):
386 clsregistry.add_class(
387 self.classname, self.cls, registry._class_registry
388 )
389
390 self._setup_inheritance(mapper_kw)
391
392 self._early_mapping(mapper_kw)
393
394 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
395 mapper_cls = Mapper
396
397 return self.set_cls_attribute(
398 "__mapper__",
399 mapper_cls(self.cls, self.local_table, **mapper_kw),
400 )
401
402 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
403 cls = self.cls
404
405 inherits = mapper_kw.get("inherits", None)
406
407 if inherits is None:
408 # since we search for classical mappings now, search for
409 # multiple mapped bases as well and raise an error.
410 inherits_search = []
411 for base_ in cls.__bases__:
412 c = _resolve_for_abstract_or_classical(base_)
413 if c is None:
414 continue
415
416 if _is_supercls_for_inherits(c) and c not in inherits_search:
417 inherits_search.append(c)
418
419 if inherits_search:
420 if len(inherits_search) > 1:
421 raise exc.InvalidRequestError(
422 "Class %s has multiple mapped bases: %r"
423 % (cls, inherits_search)
424 )
425 inherits = inherits_search[0]
426 elif isinstance(inherits, Mapper):
427 inherits = inherits.class_
428
429 self.inherits = inherits
430
431
432class _CollectedAnnotation(NamedTuple):
433 raw_annotation: _AnnotationScanType
434 mapped_container: Optional[Type[Mapped[Any]]]
435 extracted_mapped_annotation: Union[_AnnotationScanType, str]
436 is_dataclass: bool
437 attr_value: Any
438 originating_module: str
439 originating_class: Type[Any]
440
441
442class _ClassScanMapperConfig(_MapperConfig):
443 __slots__ = (
444 "registry",
445 "clsdict_view",
446 "collected_attributes",
447 "collected_annotations",
448 "local_table",
449 "persist_selectable",
450 "declared_columns",
451 "column_ordering",
452 "column_copies",
453 "table_args",
454 "tablename",
455 "mapper_args",
456 "mapper_args_fn",
457 "table_fn",
458 "inherits",
459 "single",
460 "allow_dataclass_fields",
461 "dataclass_setup_arguments",
462 "is_dataclass_prior_to_mapping",
463 "allow_unmapped_annotations",
464 )
465
466 is_deferred = False
467 registry: _RegistryType
468 clsdict_view: _ClassDict
469 collected_annotations: Dict[str, _CollectedAnnotation]
470 collected_attributes: Dict[str, Any]
471 local_table: Optional[FromClause]
472 persist_selectable: Optional[FromClause]
473 declared_columns: util.OrderedSet[Column[Any]]
474 column_ordering: Dict[Column[Any], int]
475 column_copies: Dict[
476 Union[MappedColumn[Any], Column[Any]],
477 Union[MappedColumn[Any], Column[Any]],
478 ]
479 tablename: Optional[str]
480 mapper_args: Mapping[str, Any]
481 table_args: Optional[_TableArgsType]
482 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
483 inherits: Optional[Type[Any]]
484 single: bool
485
486 is_dataclass_prior_to_mapping: bool
487 allow_unmapped_annotations: bool
488
489 dataclass_setup_arguments: Optional[_DataclassArguments]
490 """if the class has SQLAlchemy native dataclass parameters, where
491 we will turn the class into a dataclass within the declarative mapping
492 process.
493
494 """
495
496 allow_dataclass_fields: bool
497 """if true, look for dataclass-processed Field objects on the target
498 class as well as superclasses and extract ORM mapping directives from
499 the "metadata" attribute of each Field.
500
501 if False, dataclass fields can still be used, however they won't be
502 mapped.
503
504 """
505
506 def __init__(
507 self,
508 registry: _RegistryType,
509 cls_: Type[_O],
510 dict_: _ClassDict,
511 table: Optional[FromClause],
512 mapper_kw: _MapperKwArgs,
513 ):
514 # grab class dict before the instrumentation manager has been added.
515 # reduces cycles
516 self.clsdict_view = (
517 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
518 )
519 super().__init__(registry, cls_, mapper_kw)
520 self.registry = registry
521 self.persist_selectable = None
522
523 self.collected_attributes = {}
524 self.collected_annotations = {}
525 self.declared_columns = util.OrderedSet()
526 self.column_ordering = {}
527 self.column_copies = {}
528 self.single = False
529 self.dataclass_setup_arguments = dca = getattr(
530 self.cls, "_sa_apply_dc_transforms", None
531 )
532
533 self.allow_unmapped_annotations = getattr(
534 self.cls, "__allow_unmapped__", False
535 ) or bool(self.dataclass_setup_arguments)
536
537 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
538 cls_
539 )
540
541 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
542
543 # we don't want to consume Field objects from a not-already-dataclass.
544 # the Field objects won't have their "name" or "type" populated,
545 # and while it seems like we could just set these on Field as we
546 # read them, Field is documented as "user read only" and we need to
547 # stay far away from any off-label use of dataclasses APIs.
548 if (not cld or dca) and sdk:
549 raise exc.InvalidRequestError(
550 "SQLAlchemy mapped dataclasses can't consume mapping "
551 "information from dataclass.Field() objects if the immediate "
552 "class is not already a dataclass."
553 )
554
555 # if already a dataclass, and __sa_dataclass_metadata_key__ present,
556 # then also look inside of dataclass.Field() objects yielded by
557 # dataclasses.get_fields(cls) when scanning for attributes
558 self.allow_dataclass_fields = bool(sdk and cld)
559
560 self._setup_declared_events()
561
562 self._scan_attributes()
563
564 self._setup_dataclasses_transforms()
565
566 with mapperlib._CONFIGURE_MUTEX:
567 clsregistry.add_class(
568 self.classname, self.cls, registry._class_registry
569 )
570
571 self._setup_inheriting_mapper(mapper_kw)
572
573 self._extract_mappable_attributes()
574
575 self._extract_declared_columns()
576
577 self._setup_table(table)
578
579 self._setup_inheriting_columns(mapper_kw)
580
581 self._early_mapping(mapper_kw)
582
583 def _setup_declared_events(self) -> None:
584 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
585
586 @event.listens_for(Mapper, "after_configured")
587 def after_configured() -> None:
588 cast(
589 "_DeclMappedClassProtocol[Any]", self.cls
590 ).__declare_last__()
591
592 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
593
594 @event.listens_for(Mapper, "before_configured")
595 def before_configured() -> None:
596 cast(
597 "_DeclMappedClassProtocol[Any]", self.cls
598 ).__declare_first__()
599
600 def _cls_attr_override_checker(
601 self, cls: Type[_O]
602 ) -> Callable[[str, Any], bool]:
603 """Produce a function that checks if a class has overridden an
604 attribute, taking SQLAlchemy-enabled dataclass fields into account.
605
606 """
607
608 if self.allow_dataclass_fields:
609 sa_dataclass_metadata_key = _get_immediate_cls_attr(
610 cls, "__sa_dataclass_metadata_key__"
611 )
612 else:
613 sa_dataclass_metadata_key = None
614
615 if not sa_dataclass_metadata_key:
616
617 def attribute_is_overridden(key: str, obj: Any) -> bool:
618 return getattr(cls, key, obj) is not obj
619
620 else:
621 all_datacls_fields = {
622 f.name: f.metadata[sa_dataclass_metadata_key]
623 for f in util.dataclass_fields(cls)
624 if sa_dataclass_metadata_key in f.metadata
625 }
626 local_datacls_fields = {
627 f.name: f.metadata[sa_dataclass_metadata_key]
628 for f in util.local_dataclass_fields(cls)
629 if sa_dataclass_metadata_key in f.metadata
630 }
631
632 absent = object()
633
634 def attribute_is_overridden(key: str, obj: Any) -> bool:
635 if _is_declarative_props(obj):
636 obj = obj.fget
637
638 # this function likely has some failure modes still if
639 # someone is doing a deep mixing of the same attribute
640 # name as plain Python attribute vs. dataclass field.
641
642 ret = local_datacls_fields.get(key, absent)
643 if _is_declarative_props(ret):
644 ret = ret.fget
645
646 if ret is obj:
647 return False
648 elif ret is not absent:
649 return True
650
651 all_field = all_datacls_fields.get(key, absent)
652
653 ret = getattr(cls, key, obj)
654
655 if ret is obj:
656 return False
657
658 # for dataclasses, this could be the
659 # 'default' of the field. so filter more specifically
660 # for an already-mapped InstrumentedAttribute
661 if ret is not absent and isinstance(
662 ret, InstrumentedAttribute
663 ):
664 return True
665
666 if all_field is obj:
667 return False
668 elif all_field is not absent:
669 return True
670
671 # can't find another attribute
672 return False
673
674 return attribute_is_overridden
675
676 _include_dunders = {
677 "__table__",
678 "__mapper_args__",
679 "__tablename__",
680 "__table_args__",
681 }
682
683 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
684
685 def _cls_attr_resolver(
686 self, cls: Type[Any]
687 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
688 """produce a function to iterate the "attributes" of a class
689 which we want to consider for mapping, adjusting for SQLAlchemy fields
690 embedded in dataclass fields.
691
692 """
693 cls_annotations = util.get_annotations(cls)
694
695 cls_vars = vars(cls)
696
697 _include_dunders = self._include_dunders
698 _match_exclude_dunders = self._match_exclude_dunders
699
700 names = [
701 n
702 for n in util.merge_lists_w_ordering(
703 list(cls_vars), list(cls_annotations)
704 )
705 if not _match_exclude_dunders.match(n) or n in _include_dunders
706 ]
707
708 if self.allow_dataclass_fields:
709 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
710 cls, "__sa_dataclass_metadata_key__"
711 )
712 else:
713 sa_dataclass_metadata_key = None
714
715 if not sa_dataclass_metadata_key:
716
717 def local_attributes_for_class() -> (
718 Iterable[Tuple[str, Any, Any, bool]]
719 ):
720 return (
721 (
722 name,
723 cls_vars.get(name),
724 cls_annotations.get(name),
725 False,
726 )
727 for name in names
728 )
729
730 else:
731 dataclass_fields = {
732 field.name: field for field in util.local_dataclass_fields(cls)
733 }
734
735 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
736
737 def local_attributes_for_class() -> (
738 Iterable[Tuple[str, Any, Any, bool]]
739 ):
740 for name in names:
741 field = dataclass_fields.get(name, None)
742 if field and sa_dataclass_metadata_key in field.metadata:
743 yield field.name, _as_dc_declaredattr(
744 field.metadata, fixed_sa_dataclass_metadata_key
745 ), cls_annotations.get(field.name), True
746 else:
747 yield name, cls_vars.get(name), cls_annotations.get(
748 name
749 ), False
750
751 return local_attributes_for_class
752
753 def _scan_attributes(self) -> None:
754 cls = self.cls
755
756 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
757
758 clsdict_view = self.clsdict_view
759 collected_attributes = self.collected_attributes
760 column_copies = self.column_copies
761 _include_dunders = self._include_dunders
762 mapper_args_fn = None
763 table_args = inherited_table_args = None
764 table_fn = None
765 tablename = None
766 fixed_table = "__table__" in clsdict_view
767
768 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
769
770 bases = []
771
772 for base in cls.__mro__:
773 # collect bases and make sure standalone columns are copied
774 # to be the column they will ultimately be on the class,
775 # so that declared_attr functions use the right columns.
776 # need to do this all the way up the hierarchy first
777 # (see #8190)
778
779 class_mapped = base is not cls and _is_supercls_for_inherits(base)
780
781 local_attributes_for_class = self._cls_attr_resolver(base)
782
783 if not class_mapped and base is not cls:
784 locally_collected_columns = self._produce_column_copies(
785 local_attributes_for_class,
786 attribute_is_overridden,
787 fixed_table,
788 base,
789 )
790 else:
791 locally_collected_columns = {}
792
793 bases.append(
794 (
795 base,
796 class_mapped,
797 local_attributes_for_class,
798 locally_collected_columns,
799 )
800 )
801
802 for (
803 base,
804 class_mapped,
805 local_attributes_for_class,
806 locally_collected_columns,
807 ) in bases:
808 # this transfer can also take place as we scan each name
809 # for finer-grained control of how collected_attributes is
810 # populated, as this is what impacts column ordering.
811 # however it's simpler to get it out of the way here.
812 collected_attributes.update(locally_collected_columns)
813
814 for (
815 name,
816 obj,
817 annotation,
818 is_dataclass_field,
819 ) in local_attributes_for_class():
820 if name in _include_dunders:
821 if name == "__mapper_args__":
822 check_decl = _check_declared_props_nocascade(
823 obj, name, cls
824 )
825 if not mapper_args_fn and (
826 not class_mapped or check_decl
827 ):
828 # don't even invoke __mapper_args__ until
829 # after we've determined everything about the
830 # mapped table.
831 # make a copy of it so a class-level dictionary
832 # is not overwritten when we update column-based
833 # arguments.
834 def _mapper_args_fn() -> Dict[str, Any]:
835 return dict(cls_as_Decl.__mapper_args__)
836
837 mapper_args_fn = _mapper_args_fn
838
839 elif name == "__tablename__":
840 check_decl = _check_declared_props_nocascade(
841 obj, name, cls
842 )
843 if not tablename and (not class_mapped or check_decl):
844 tablename = cls_as_Decl.__tablename__
845 elif name == "__table__":
846 check_decl = _check_declared_props_nocascade(
847 obj, name, cls
848 )
849 # if a @declared_attr using "__table__" is detected,
850 # wrap up a callable to look for "__table__" from
851 # the final concrete class when we set up a table.
852 # this was fixed by
853 # #11509, regression in 2.0 from version 1.4.
854 if check_decl and not table_fn:
855 # don't even invoke __table__ until we're ready
856 def _table_fn() -> FromClause:
857 return cls_as_Decl.__table__
858
859 table_fn = _table_fn
860
861 elif name == "__table_args__":
862 check_decl = _check_declared_props_nocascade(
863 obj, name, cls
864 )
865 if not table_args and (not class_mapped or check_decl):
866 table_args = cls_as_Decl.__table_args__
867 if not isinstance(
868 table_args, (tuple, dict, type(None))
869 ):
870 raise exc.ArgumentError(
871 "__table_args__ value must be a tuple, "
872 "dict, or None"
873 )
874 if base is not cls:
875 inherited_table_args = True
876 else:
877 # any other dunder names; should not be here
878 # as we have tested for all four names in
879 # _include_dunders
880 assert False
881 elif class_mapped:
882 if _is_declarative_props(obj) and not obj._quiet:
883 util.warn(
884 "Regular (i.e. not __special__) "
885 "attribute '%s.%s' uses @declared_attr, "
886 "but owning class %s is mapped - "
887 "not applying to subclass %s."
888 % (base.__name__, name, base, cls)
889 )
890
891 continue
892 elif base is not cls:
893 # we're a mixin, abstract base, or something that is
894 # acting like that for now.
895
896 if isinstance(obj, (Column, MappedColumn)):
897 # already copied columns to the mapped class.
898 continue
899 elif isinstance(obj, MapperProperty):
900 raise exc.InvalidRequestError(
901 "Mapper properties (i.e. deferred,"
902 "column_property(), relationship(), etc.) must "
903 "be declared as @declared_attr callables "
904 "on declarative mixin classes. For dataclass "
905 "field() objects, use a lambda:"
906 )
907 elif _is_declarative_props(obj):
908 # tried to get overloads to tell this to
909 # pylance, no luck
910 assert obj is not None
911
912 if obj._cascading:
913 if name in clsdict_view:
914 # unfortunately, while we can use the user-
915 # defined attribute here to allow a clean
916 # override, if there's another
917 # subclass below then it still tries to use
918 # this. not sure if there is enough
919 # information here to add this as a feature
920 # later on.
921 util.warn(
922 "Attribute '%s' on class %s cannot be "
923 "processed due to "
924 "@declared_attr.cascading; "
925 "skipping" % (name, cls)
926 )
927 collected_attributes[name] = column_copies[obj] = (
928 ret
929 ) = obj.__get__(obj, cls)
930 setattr(cls, name, ret)
931 else:
932 if is_dataclass_field:
933 # access attribute using normal class access
934 # first, to see if it's been mapped on a
935 # superclass. note if the dataclasses.field()
936 # has "default", this value can be anything.
937 ret = getattr(cls, name, None)
938
939 # so, if it's anything that's not ORM
940 # mapped, assume we should invoke the
941 # declared_attr
942 if not isinstance(ret, InspectionAttr):
943 ret = obj.fget()
944 else:
945 # access attribute using normal class access.
946 # if the declared attr already took place
947 # on a superclass that is mapped, then
948 # this is no longer a declared_attr, it will
949 # be the InstrumentedAttribute
950 ret = getattr(cls, name)
951
952 # correct for proxies created from hybrid_property
953 # or similar. note there is no known case that
954 # produces nested proxies, so we are only
955 # looking one level deep right now.
956
957 if (
958 isinstance(ret, InspectionAttr)
959 and attr_is_internal_proxy(ret)
960 and not isinstance(
961 ret.original_property, MapperProperty
962 )
963 ):
964 ret = ret.descriptor
965
966 collected_attributes[name] = column_copies[obj] = (
967 ret
968 )
969
970 if (
971 isinstance(ret, (Column, MapperProperty))
972 and ret.doc is None
973 ):
974 ret.doc = obj.__doc__
975
976 self._collect_annotation(
977 name,
978 obj._collect_return_annotation(),
979 base,
980 True,
981 obj,
982 )
983 elif _is_mapped_annotation(annotation, cls, base):
984 # Mapped annotation without any object.
985 # product_column_copies should have handled this.
986 # if future support for other MapperProperty,
987 # then test if this name is already handled and
988 # otherwise proceed to generate.
989 if not fixed_table:
990 assert (
991 name in collected_attributes
992 or attribute_is_overridden(name, None)
993 )
994 continue
995 else:
996 # here, the attribute is some other kind of
997 # property that we assume is not part of the
998 # declarative mapping. however, check for some
999 # more common mistakes
1000 self._warn_for_decl_attributes(base, name, obj)
1001 elif is_dataclass_field and (
1002 name not in clsdict_view or clsdict_view[name] is not obj
1003 ):
1004 # here, we are definitely looking at the target class
1005 # and not a superclass. this is currently a
1006 # dataclass-only path. if the name is only
1007 # a dataclass field and isn't in local cls.__dict__,
1008 # put the object there.
1009 # assert that the dataclass-enabled resolver agrees
1010 # with what we are seeing
1011
1012 assert not attribute_is_overridden(name, obj)
1013
1014 if _is_declarative_props(obj):
1015 obj = obj.fget()
1016
1017 collected_attributes[name] = obj
1018 self._collect_annotation(
1019 name, annotation, base, False, obj
1020 )
1021 else:
1022 collected_annotation = self._collect_annotation(
1023 name, annotation, base, None, obj
1024 )
1025 is_mapped = (
1026 collected_annotation is not None
1027 and collected_annotation.mapped_container is not None
1028 )
1029 generated_obj = (
1030 collected_annotation.attr_value
1031 if collected_annotation is not None
1032 else obj
1033 )
1034 if obj is None and not fixed_table and is_mapped:
1035 collected_attributes[name] = (
1036 generated_obj
1037 if generated_obj is not None
1038 else MappedColumn()
1039 )
1040 elif name in clsdict_view:
1041 collected_attributes[name] = obj
1042 # else if the name is not in the cls.__dict__,
1043 # don't collect it as an attribute.
1044 # we will see the annotation only, which is meaningful
1045 # both for mapping and dataclasses setup
1046
1047 if inherited_table_args and not tablename:
1048 table_args = None
1049
1050 self.table_args = table_args
1051 self.tablename = tablename
1052 self.mapper_args_fn = mapper_args_fn
1053 self.table_fn = table_fn
1054
1055 def _setup_dataclasses_transforms(self) -> None:
1056 dataclass_setup_arguments = self.dataclass_setup_arguments
1057 if not dataclass_setup_arguments:
1058 return
1059
1060 # can't use is_dataclass since it uses hasattr
1061 if "__dataclass_fields__" in self.cls.__dict__:
1062 raise exc.InvalidRequestError(
1063 f"Class {self.cls} is already a dataclass; ensure that "
1064 "base classes / decorator styles of establishing dataclasses "
1065 "are not being mixed. "
1066 "This can happen if a class that inherits from "
1067 "'MappedAsDataclass', even indirectly, is been mapped with "
1068 "'@registry.mapped_as_dataclass'"
1069 )
1070
1071 # can't create a dataclass if __table__ is already there. This would
1072 # fail an assertion when calling _get_arguments_for_make_dataclass:
1073 # assert False, "Mapped[] received without a mapping declaration"
1074 if "__table__" in self.cls.__dict__:
1075 raise exc.InvalidRequestError(
1076 f"Class {self.cls} already defines a '__table__'. "
1077 "ORM Annotated Dataclasses do not support a pre-existing "
1078 "'__table__' element"
1079 )
1080
1081 warn_for_non_dc_attrs = collections.defaultdict(list)
1082
1083 def _allow_dataclass_field(
1084 key: str, originating_class: Type[Any]
1085 ) -> bool:
1086 if (
1087 originating_class is not self.cls
1088 and "__dataclass_fields__" not in originating_class.__dict__
1089 ):
1090 warn_for_non_dc_attrs[originating_class].append(key)
1091
1092 return True
1093
1094 manager = instrumentation.manager_of_class(self.cls)
1095 assert manager is not None
1096
1097 field_list = [
1098 _AttributeOptions._get_arguments_for_make_dataclass(
1099 key,
1100 anno,
1101 mapped_container,
1102 self.collected_attributes.get(key, _NoArg.NO_ARG),
1103 )
1104 for key, anno, mapped_container in (
1105 (
1106 key,
1107 mapped_anno if mapped_anno else raw_anno,
1108 mapped_container,
1109 )
1110 for key, (
1111 raw_anno,
1112 mapped_container,
1113 mapped_anno,
1114 is_dc,
1115 attr_value,
1116 originating_module,
1117 originating_class,
1118 ) in self.collected_annotations.items()
1119 if _allow_dataclass_field(key, originating_class)
1120 and (
1121 key not in self.collected_attributes
1122 # issue #9226; check for attributes that we've collected
1123 # which are already instrumented, which we would assume
1124 # mean we are in an ORM inheritance mapping and this
1125 # attribute is already mapped on the superclass. Under
1126 # no circumstance should any QueryableAttribute be sent to
1127 # the dataclass() function; anything that's mapped should
1128 # be Field and that's it
1129 or not isinstance(
1130 self.collected_attributes[key], QueryableAttribute
1131 )
1132 )
1133 )
1134 ]
1135
1136 if warn_for_non_dc_attrs:
1137 for (
1138 originating_class,
1139 non_dc_attrs,
1140 ) in warn_for_non_dc_attrs.items():
1141 util.warn_deprecated(
1142 f"When transforming {self.cls} to a dataclass, "
1143 f"attribute(s) "
1144 f"{', '.join(repr(key) for key in non_dc_attrs)} "
1145 f"originates from superclass "
1146 f"{originating_class}, which is not a dataclass. This "
1147 f"usage is deprecated and will raise an error in "
1148 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
1149 f"Dataclasses, ensure that all mixin classes and other "
1150 f"superclasses which include attributes are also a "
1151 f"subclass of MappedAsDataclass.",
1152 "2.0",
1153 code="dcmx",
1154 )
1155
1156 annotations = {}
1157 defaults = {}
1158 for item in field_list:
1159 if len(item) == 2:
1160 name, tp = item
1161 elif len(item) == 3:
1162 name, tp, spec = item
1163 defaults[name] = spec
1164 else:
1165 assert False
1166 annotations[name] = tp
1167
1168 for k, v in defaults.items():
1169 setattr(self.cls, k, v)
1170
1171 self._apply_dataclasses_to_any_class(
1172 dataclass_setup_arguments, self.cls, annotations
1173 )
1174
1175 @classmethod
1176 def _update_annotations_for_non_mapped_class(
1177 cls, klass: Type[_O]
1178 ) -> Mapping[str, _AnnotationScanType]:
1179 cls_annotations = util.get_annotations(klass)
1180
1181 new_anno = {}
1182 for name, annotation in cls_annotations.items():
1183 if _is_mapped_annotation(annotation, klass, klass):
1184 extracted = _extract_mapped_subtype(
1185 annotation,
1186 klass,
1187 klass.__module__,
1188 name,
1189 type(None),
1190 required=False,
1191 is_dataclass_field=False,
1192 expect_mapped=False,
1193 )
1194 if extracted:
1195 inner, _ = extracted
1196 new_anno[name] = inner
1197 else:
1198 new_anno[name] = annotation
1199 return new_anno
1200
1201 @classmethod
1202 def _apply_dataclasses_to_any_class(
1203 cls,
1204 dataclass_setup_arguments: _DataclassArguments,
1205 klass: Type[_O],
1206 use_annotations: Mapping[str, _AnnotationScanType],
1207 ) -> None:
1208 cls._assert_dc_arguments(dataclass_setup_arguments)
1209
1210 dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
1211 if dataclass_callable is _NoArg.NO_ARG:
1212 dataclass_callable = dataclasses.dataclass
1213
1214 restored: Optional[Any]
1215
1216 if use_annotations:
1217 # apply constructed annotations that should look "normal" to a
1218 # dataclasses callable, based on the fields present. This
1219 # means remove the Mapped[] container and ensure all Field
1220 # entries have an annotation
1221 restored = getattr(klass, "__annotations__", None)
1222 klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
1223 else:
1224 restored = None
1225
1226 try:
1227 dataclass_callable( # type: ignore[call-overload]
1228 klass,
1229 **{ # type: ignore[call-overload,unused-ignore]
1230 k: v
1231 for k, v in dataclass_setup_arguments.items()
1232 if v is not _NoArg.NO_ARG and k != "dataclass_callable"
1233 },
1234 )
1235 except (TypeError, ValueError) as ex:
1236 raise exc.InvalidRequestError(
1237 f"Python dataclasses error encountered when creating "
1238 f"dataclass for {klass.__name__!r}: "
1239 f"{ex!r}. Please refer to Python dataclasses "
1240 "documentation for additional information.",
1241 code="dcte",
1242 ) from ex
1243 finally:
1244 # restore original annotations outside of the dataclasses
1245 # process; for mixins and __abstract__ superclasses, SQLAlchemy
1246 # Declarative will need to see the Mapped[] container inside the
1247 # annotations in order to map subclasses
1248 if use_annotations:
1249 if restored is None:
1250 del klass.__annotations__
1251 else:
1252 klass.__annotations__ = restored
1253
1254 @classmethod
1255 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
1256 allowed = {
1257 "init",
1258 "repr",
1259 "order",
1260 "eq",
1261 "unsafe_hash",
1262 "kw_only",
1263 "match_args",
1264 "dataclass_callable",
1265 }
1266 disallowed_args = set(arguments).difference(allowed)
1267 if disallowed_args:
1268 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
1269 raise exc.ArgumentError(
1270 f"Dataclass argument(s) {msg} are not accepted"
1271 )
1272
1273 def _collect_annotation(
1274 self,
1275 name: str,
1276 raw_annotation: _AnnotationScanType,
1277 originating_class: Type[Any],
1278 expect_mapped: Optional[bool],
1279 attr_value: Any,
1280 ) -> Optional[_CollectedAnnotation]:
1281 if name in self.collected_annotations:
1282 return self.collected_annotations[name]
1283
1284 if raw_annotation is None:
1285 return None
1286
1287 is_dataclass = self.is_dataclass_prior_to_mapping
1288 allow_unmapped = self.allow_unmapped_annotations
1289
1290 if expect_mapped is None:
1291 is_dataclass_field = isinstance(attr_value, dataclasses.Field)
1292 expect_mapped = (
1293 not is_dataclass_field
1294 and not allow_unmapped
1295 and (
1296 attr_value is None
1297 or isinstance(attr_value, _MappedAttribute)
1298 )
1299 )
1300
1301 is_dataclass_field = False
1302 extracted = _extract_mapped_subtype(
1303 raw_annotation,
1304 self.cls,
1305 originating_class.__module__,
1306 name,
1307 type(attr_value),
1308 required=False,
1309 is_dataclass_field=is_dataclass_field,
1310 expect_mapped=expect_mapped and not is_dataclass,
1311 )
1312 if extracted is None:
1313 # ClassVar can come out here
1314 return None
1315
1316 extracted_mapped_annotation, mapped_container = extracted
1317
1318 if attr_value is None and not is_literal(extracted_mapped_annotation):
1319 for elem in get_args(extracted_mapped_annotation):
1320 if is_fwd_ref(
1321 elem, check_generic=True, check_for_plain_string=True
1322 ):
1323 elem = de_stringify_annotation(
1324 self.cls,
1325 elem,
1326 originating_class.__module__,
1327 include_generic=True,
1328 )
1329 # look in Annotated[...] for an ORM construct,
1330 # such as Annotated[int, mapped_column(primary_key=True)]
1331 if isinstance(elem, _IntrospectsAnnotations):
1332 attr_value = elem.found_in_pep593_annotated()
1333
1334 self.collected_annotations[name] = ca = _CollectedAnnotation(
1335 raw_annotation,
1336 mapped_container,
1337 extracted_mapped_annotation,
1338 is_dataclass,
1339 attr_value,
1340 originating_class.__module__,
1341 originating_class,
1342 )
1343 return ca
1344
1345 def _warn_for_decl_attributes(
1346 self, cls: Type[Any], key: str, c: Any
1347 ) -> None:
1348 if isinstance(c, expression.ColumnElement):
1349 util.warn(
1350 f"Attribute '{key}' on class {cls} appears to "
1351 "be a non-schema SQLAlchemy expression "
1352 "object; this won't be part of the declarative mapping. "
1353 "To map arbitrary expressions, use ``column_property()`` "
1354 "or a similar function such as ``deferred()``, "
1355 "``query_expression()`` etc. "
1356 )
1357
1358 def _produce_column_copies(
1359 self,
1360 attributes_for_class: Callable[
1361 [], Iterable[Tuple[str, Any, Any, bool]]
1362 ],
1363 attribute_is_overridden: Callable[[str, Any], bool],
1364 fixed_table: bool,
1365 originating_class: Type[Any],
1366 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
1367 cls = self.cls
1368 dict_ = self.clsdict_view
1369 locally_collected_attributes = {}
1370 column_copies = self.column_copies
1371 # copy mixin columns to the mapped class
1372
1373 for name, obj, annotation, is_dataclass in attributes_for_class():
1374 if (
1375 not fixed_table
1376 and obj is None
1377 and _is_mapped_annotation(annotation, cls, originating_class)
1378 ):
1379 # obj is None means this is the annotation only path
1380
1381 if attribute_is_overridden(name, obj):
1382 # perform same "overridden" check as we do for
1383 # Column/MappedColumn, this is how a mixin col is not
1384 # applied to an inherited subclass that does not have
1385 # the mixin. the anno-only path added here for
1386 # #9564
1387 continue
1388
1389 collected_annotation = self._collect_annotation(
1390 name, annotation, originating_class, True, obj
1391 )
1392 obj = (
1393 collected_annotation.attr_value
1394 if collected_annotation is not None
1395 else obj
1396 )
1397 if obj is None:
1398 obj = MappedColumn()
1399
1400 locally_collected_attributes[name] = obj
1401 setattr(cls, name, obj)
1402
1403 elif isinstance(obj, (Column, MappedColumn)):
1404 if attribute_is_overridden(name, obj):
1405 # if column has been overridden
1406 # (like by the InstrumentedAttribute of the
1407 # superclass), skip. don't collect the annotation
1408 # either (issue #8718)
1409 continue
1410
1411 collected_annotation = self._collect_annotation(
1412 name, annotation, originating_class, True, obj
1413 )
1414 obj = (
1415 collected_annotation.attr_value
1416 if collected_annotation is not None
1417 else obj
1418 )
1419
1420 if name not in dict_ and not (
1421 "__table__" in dict_
1422 and (getattr(obj, "name", None) or name)
1423 in dict_["__table__"].c
1424 ):
1425 if obj.foreign_keys:
1426 for fk in obj.foreign_keys:
1427 if (
1428 fk._table_column is not None
1429 and fk._table_column.table is None
1430 ):
1431 raise exc.InvalidRequestError(
1432 "Columns with foreign keys to "
1433 "non-table-bound "
1434 "columns must be declared as "
1435 "@declared_attr callables "
1436 "on declarative mixin classes. "
1437 "For dataclass "
1438 "field() objects, use a lambda:."
1439 )
1440
1441 column_copies[obj] = copy_ = obj._copy()
1442
1443 locally_collected_attributes[name] = copy_
1444 setattr(cls, name, copy_)
1445
1446 return locally_collected_attributes
1447
1448 def _extract_mappable_attributes(self) -> None:
1449 cls = self.cls
1450 collected_attributes = self.collected_attributes
1451
1452 our_stuff = self.properties
1453
1454 _include_dunders = self._include_dunders
1455
1456 late_mapped = _get_immediate_cls_attr(
1457 cls, "_sa_decl_prepare_nocascade", strict=True
1458 )
1459
1460 allow_unmapped_annotations = self.allow_unmapped_annotations
1461 expect_annotations_wo_mapped = (
1462 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
1463 )
1464
1465 look_for_dataclass_things = bool(self.dataclass_setup_arguments)
1466
1467 for k in list(collected_attributes):
1468 if k in _include_dunders:
1469 continue
1470
1471 value = collected_attributes[k]
1472
1473 if _is_declarative_props(value):
1474 # @declared_attr in collected_attributes only occurs here for a
1475 # @declared_attr that's directly on the mapped class;
1476 # for a mixin, these have already been evaluated
1477 if value._cascading:
1478 util.warn(
1479 "Use of @declared_attr.cascading only applies to "
1480 "Declarative 'mixin' and 'abstract' classes. "
1481 "Currently, this flag is ignored on mapped class "
1482 "%s" % self.cls
1483 )
1484
1485 value = getattr(cls, k)
1486
1487 elif (
1488 isinstance(value, QueryableAttribute)
1489 and value.class_ is not cls
1490 and value.key != k
1491 ):
1492 # detect a QueryableAttribute that's already mapped being
1493 # assigned elsewhere in userland, turn into a synonym()
1494 value = SynonymProperty(value.key)
1495 setattr(cls, k, value)
1496
1497 if (
1498 isinstance(value, tuple)
1499 and len(value) == 1
1500 and isinstance(value[0], (Column, _MappedAttribute))
1501 ):
1502 util.warn(
1503 "Ignoring declarative-like tuple value of attribute "
1504 "'%s': possibly a copy-and-paste error with a comma "
1505 "accidentally placed at the end of the line?" % k
1506 )
1507 continue
1508 elif look_for_dataclass_things and isinstance(
1509 value, dataclasses.Field
1510 ):
1511 # we collected a dataclass Field; dataclasses would have
1512 # set up the correct state on the class
1513 continue
1514 elif not isinstance(value, (Column, _DCAttributeOptions)):
1515 # using @declared_attr for some object that
1516 # isn't Column/MapperProperty/_DCAttributeOptions; remove
1517 # from the clsdict_view
1518 # and place the evaluated value onto the class.
1519 collected_attributes.pop(k)
1520 self._warn_for_decl_attributes(cls, k, value)
1521 if not late_mapped:
1522 setattr(cls, k, value)
1523 continue
1524 # we expect to see the name 'metadata' in some valid cases;
1525 # however at this point we see it's assigned to something trying
1526 # to be mapped, so raise for that.
1527 # TODO: should "registry" here be also? might be too late
1528 # to change that now (2.0 betas)
1529 elif k in ("metadata",):
1530 raise exc.InvalidRequestError(
1531 f"Attribute name '{k}' is reserved when using the "
1532 "Declarative API."
1533 )
1534 elif isinstance(value, Column):
1535 _undefer_column_name(
1536 k, self.column_copies.get(value, value) # type: ignore
1537 )
1538 else:
1539 if isinstance(value, _IntrospectsAnnotations):
1540 (
1541 annotation,
1542 mapped_container,
1543 extracted_mapped_annotation,
1544 is_dataclass,
1545 attr_value,
1546 originating_module,
1547 originating_class,
1548 ) = self.collected_annotations.get(
1549 k, (None, None, None, False, None, None, None)
1550 )
1551
1552 # issue #8692 - don't do any annotation interpretation if
1553 # an annotation were present and a container such as
1554 # Mapped[] etc. were not used. If annotation is None,
1555 # do declarative_scan so that the property can raise
1556 # for required
1557 if (
1558 mapped_container is not None
1559 or annotation is None
1560 # issue #10516: need to do declarative_scan even with
1561 # a non-Mapped annotation if we are doing
1562 # __allow_unmapped__, for things like col.name
1563 # assignment
1564 or allow_unmapped_annotations
1565 ):
1566 try:
1567 value.declarative_scan(
1568 self,
1569 self.registry,
1570 cls,
1571 originating_module,
1572 k,
1573 mapped_container,
1574 annotation,
1575 extracted_mapped_annotation,
1576 is_dataclass,
1577 )
1578 except NameError as ne:
1579 raise orm_exc.MappedAnnotationError(
1580 f"Could not resolve all types within mapped "
1581 f'annotation: "{annotation}". Ensure all '
1582 f"types are written correctly and are "
1583 f"imported within the module in use."
1584 ) from ne
1585 else:
1586 # assert that we were expecting annotations
1587 # without Mapped[] were going to be passed.
1588 # otherwise an error should have been raised
1589 # by util._extract_mapped_subtype before we got here.
1590 assert expect_annotations_wo_mapped
1591
1592 if isinstance(value, _DCAttributeOptions):
1593 if (
1594 value._has_dataclass_arguments
1595 and not look_for_dataclass_things
1596 ):
1597 if isinstance(value, MapperProperty):
1598 argnames = [
1599 "init",
1600 "default_factory",
1601 "repr",
1602 "default",
1603 "dataclass_metadata",
1604 ]
1605 else:
1606 argnames = [
1607 "init",
1608 "default_factory",
1609 "repr",
1610 "dataclass_metadata",
1611 ]
1612
1613 args = {
1614 a
1615 for a in argnames
1616 if getattr(
1617 value._attribute_options, f"dataclasses_{a}"
1618 )
1619 is not _NoArg.NO_ARG
1620 }
1621
1622 raise exc.ArgumentError(
1623 f"Attribute '{k}' on class {cls} includes "
1624 f"dataclasses argument(s): "
1625 f"{', '.join(sorted(repr(a) for a in args))} but "
1626 f"class does not specify "
1627 "SQLAlchemy native dataclass configuration."
1628 )
1629
1630 if not isinstance(value, (MapperProperty, _MapsColumns)):
1631 # filter for _DCAttributeOptions objects that aren't
1632 # MapperProperty / mapped_column(). Currently this
1633 # includes AssociationProxy. pop it from the things
1634 # we're going to map and set it up as a descriptor
1635 # on the class.
1636 collected_attributes.pop(k)
1637
1638 # Assoc Prox (or other descriptor object that may
1639 # use _DCAttributeOptions) is usually here, except if
1640 # 1. we're a
1641 # dataclass, dataclasses would have removed the
1642 # attr here or 2. assoc proxy is coming from a
1643 # superclass, we want it to be direct here so it
1644 # tracks state or 3. assoc prox comes from
1645 # declared_attr, uncommon case
1646 setattr(cls, k, value)
1647 continue
1648
1649 our_stuff[k] = value
1650
1651 def _extract_declared_columns(self) -> None:
1652 our_stuff = self.properties
1653
1654 # extract columns from the class dict
1655 declared_columns = self.declared_columns
1656 column_ordering = self.column_ordering
1657 name_to_prop_key = collections.defaultdict(set)
1658
1659 for key, c in list(our_stuff.items()):
1660 if isinstance(c, _MapsColumns):
1661 mp_to_assign = c.mapper_property_to_assign
1662 if mp_to_assign:
1663 our_stuff[key] = mp_to_assign
1664 else:
1665 # if no mapper property to assign, this currently means
1666 # this is a MappedColumn that will produce a Column for us
1667 del our_stuff[key]
1668
1669 for col, sort_order in c.columns_to_assign:
1670 if not isinstance(c, CompositeProperty):
1671 name_to_prop_key[col.name].add(key)
1672 declared_columns.add(col)
1673
1674 # we would assert this, however we want the below
1675 # warning to take effect instead. See #9630
1676 # assert col not in column_ordering
1677
1678 column_ordering[col] = sort_order
1679
1680 # if this is a MappedColumn and the attribute key we
1681 # have is not what the column has for its key, map the
1682 # Column explicitly under the attribute key name.
1683 # otherwise, Mapper will map it under the column key.
1684 if mp_to_assign is None and key != col.key:
1685 our_stuff[key] = col
1686 elif isinstance(c, Column):
1687 # undefer previously occurred here, and now occurs earlier.
1688 # ensure every column we get here has been named
1689 assert c.name is not None
1690 name_to_prop_key[c.name].add(key)
1691 declared_columns.add(c)
1692 # if the column is the same name as the key,
1693 # remove it from the explicit properties dict.
1694 # the normal rules for assigning column-based properties
1695 # will take over, including precedence of columns
1696 # in multi-column ColumnProperties.
1697 if key == c.key:
1698 del our_stuff[key]
1699
1700 for name, keys in name_to_prop_key.items():
1701 if len(keys) > 1:
1702 util.warn(
1703 "On class %r, Column object %r named "
1704 "directly multiple times, "
1705 "only one will be used: %s. "
1706 "Consider using orm.synonym instead"
1707 % (self.classname, name, (", ".join(sorted(keys))))
1708 )
1709
1710 def _setup_table(self, table: Optional[FromClause] = None) -> None:
1711 cls = self.cls
1712 cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
1713
1714 tablename = self.tablename
1715 table_args = self.table_args
1716 clsdict_view = self.clsdict_view
1717 declared_columns = self.declared_columns
1718 column_ordering = self.column_ordering
1719
1720 manager = attributes.manager_of_class(cls)
1721
1722 if (
1723 self.table_fn is None
1724 and "__table__" not in clsdict_view
1725 and table is None
1726 ):
1727 if hasattr(cls, "__table_cls__"):
1728 table_cls = cast(
1729 Type[Table],
1730 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
1731 )
1732 else:
1733 table_cls = Table
1734
1735 if tablename is not None:
1736 args: Tuple[Any, ...] = ()
1737 table_kw: Dict[str, Any] = {}
1738
1739 if table_args:
1740 if isinstance(table_args, dict):
1741 table_kw = table_args
1742 elif isinstance(table_args, tuple):
1743 if isinstance(table_args[-1], dict):
1744 args, table_kw = table_args[0:-1], table_args[-1]
1745 else:
1746 args = table_args
1747
1748 autoload_with = clsdict_view.get("__autoload_with__")
1749 if autoload_with:
1750 table_kw["autoload_with"] = autoload_with
1751
1752 autoload = clsdict_view.get("__autoload__")
1753 if autoload:
1754 table_kw["autoload"] = True
1755
1756 sorted_columns = sorted(
1757 declared_columns,
1758 key=lambda c: column_ordering.get(c, 0),
1759 )
1760 table = self.set_cls_attribute(
1761 "__table__",
1762 table_cls(
1763 tablename,
1764 self._metadata_for_cls(manager),
1765 *sorted_columns,
1766 *args,
1767 **table_kw,
1768 ),
1769 )
1770 else:
1771 if table is None:
1772 if self.table_fn:
1773 table = self.set_cls_attribute(
1774 "__table__", self.table_fn()
1775 )
1776 else:
1777 table = cls_as_Decl.__table__
1778 if declared_columns:
1779 for c in declared_columns:
1780 if not table.c.contains_column(c):
1781 raise exc.ArgumentError(
1782 "Can't add additional column %r when "
1783 "specifying __table__" % c.key
1784 )
1785
1786 self.local_table = table
1787
1788 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
1789 meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
1790 if meta is not None:
1791 return meta
1792 else:
1793 return manager.registry.metadata
1794
1795 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
1796 cls = self.cls
1797
1798 inherits = mapper_kw.get("inherits", None)
1799
1800 if inherits is None:
1801 # since we search for classical mappings now, search for
1802 # multiple mapped bases as well and raise an error.
1803 inherits_search = []
1804 for base_ in cls.__bases__:
1805 c = _resolve_for_abstract_or_classical(base_)
1806 if c is None:
1807 continue
1808
1809 if _is_supercls_for_inherits(c) and c not in inherits_search:
1810 inherits_search.append(c)
1811
1812 if inherits_search:
1813 if len(inherits_search) > 1:
1814 raise exc.InvalidRequestError(
1815 "Class %s has multiple mapped bases: %r"
1816 % (cls, inherits_search)
1817 )
1818 inherits = inherits_search[0]
1819 elif isinstance(inherits, Mapper):
1820 inherits = inherits.class_
1821
1822 self.inherits = inherits
1823
1824 clsdict_view = self.clsdict_view
1825 if "__table__" not in clsdict_view and self.tablename is None:
1826 self.single = True
1827
1828 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
1829 table = self.local_table
1830 cls = self.cls
1831 table_args = self.table_args
1832 declared_columns = self.declared_columns
1833
1834 if (
1835 table is None
1836 and self.inherits is None
1837 and not _get_immediate_cls_attr(cls, "__no_table__")
1838 ):
1839 raise exc.InvalidRequestError(
1840 "Class %r does not have a __table__ or __tablename__ "
1841 "specified and does not inherit from an existing "
1842 "table-mapped class." % cls
1843 )
1844 elif self.inherits:
1845 inherited_mapper_or_config = _declared_mapping_info(self.inherits)
1846 assert inherited_mapper_or_config is not None
1847 inherited_table = inherited_mapper_or_config.local_table
1848 inherited_persist_selectable = (
1849 inherited_mapper_or_config.persist_selectable
1850 )
1851
1852 if table is None:
1853 # single table inheritance.
1854 # ensure no table args
1855 if table_args:
1856 raise exc.ArgumentError(
1857 "Can't place __table_args__ on an inherited class "
1858 "with no table."
1859 )
1860
1861 # add any columns declared here to the inherited table.
1862 if declared_columns and not isinstance(inherited_table, Table):
1863 raise exc.ArgumentError(
1864 f"Can't declare columns on single-table-inherited "
1865 f"subclass {self.cls}; superclass {self.inherits} "
1866 "is not mapped to a Table"
1867 )
1868
1869 for col in declared_columns:
1870 assert inherited_table is not None
1871 if col.name in inherited_table.c:
1872 if inherited_table.c[col.name] is col:
1873 continue
1874 raise exc.ArgumentError(
1875 f"Column '{col}' on class {cls.__name__} "
1876 f"conflicts with existing column "
1877 f"'{inherited_table.c[col.name]}'. If using "
1878 f"Declarative, consider using the "
1879 "use_existing_column parameter of mapped_column() "
1880 "to resolve conflicts."
1881 )
1882 if col.primary_key:
1883 raise exc.ArgumentError(
1884 "Can't place primary key columns on an inherited "
1885 "class with no table."
1886 )
1887
1888 if TYPE_CHECKING:
1889 assert isinstance(inherited_table, Table)
1890
1891 inherited_table.append_column(col)
1892 if (
1893 inherited_persist_selectable is not None
1894 and inherited_persist_selectable is not inherited_table
1895 ):
1896 inherited_persist_selectable._refresh_for_new_column(
1897 col
1898 )
1899
1900 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
1901 properties = self.properties
1902
1903 if self.mapper_args_fn:
1904 mapper_args = self.mapper_args_fn()
1905 else:
1906 mapper_args = {}
1907
1908 if mapper_kw:
1909 mapper_args.update(mapper_kw)
1910
1911 if "properties" in mapper_args:
1912 properties = dict(properties)
1913 properties.update(mapper_args["properties"])
1914
1915 # make sure that column copies are used rather
1916 # than the original columns from any mixins
1917 for k in ("version_id_col", "polymorphic_on"):
1918 if k in mapper_args:
1919 v = mapper_args[k]
1920 mapper_args[k] = self.column_copies.get(v, v)
1921
1922 if "primary_key" in mapper_args:
1923 mapper_args["primary_key"] = [
1924 self.column_copies.get(v, v)
1925 for v in util.to_list(mapper_args["primary_key"])
1926 ]
1927
1928 if "inherits" in mapper_args:
1929 inherits_arg = mapper_args["inherits"]
1930 if isinstance(inherits_arg, Mapper):
1931 inherits_arg = inherits_arg.class_
1932
1933 if inherits_arg is not self.inherits:
1934 raise exc.InvalidRequestError(
1935 "mapper inherits argument given for non-inheriting "
1936 "class %s" % (mapper_args["inherits"])
1937 )
1938
1939 if self.inherits:
1940 mapper_args["inherits"] = self.inherits
1941
1942 if self.inherits and not mapper_args.get("concrete", False):
1943 # note the superclass is expected to have a Mapper assigned and
1944 # not be a deferred config, as this is called within map()
1945 inherited_mapper = class_mapper(self.inherits, False)
1946 inherited_table = inherited_mapper.local_table
1947
1948 # single or joined inheritance
1949 # exclude any cols on the inherited table which are
1950 # not mapped on the parent class, to avoid
1951 # mapping columns specific to sibling/nephew classes
1952 if "exclude_properties" not in mapper_args:
1953 mapper_args["exclude_properties"] = exclude_properties = {
1954 c.key
1955 for c in inherited_table.c
1956 if c not in inherited_mapper._columntoproperty
1957 }.union(inherited_mapper.exclude_properties or ())
1958 exclude_properties.difference_update(
1959 [c.key for c in self.declared_columns]
1960 )
1961
1962 # look through columns in the current mapper that
1963 # are keyed to a propname different than the colname
1964 # (if names were the same, we'd have popped it out above,
1965 # in which case the mapper makes this combination).
1966 # See if the superclass has a similar column property.
1967 # If so, join them together.
1968 for k, col in list(properties.items()):
1969 if not isinstance(col, expression.ColumnElement):
1970 continue
1971 if k in inherited_mapper._props:
1972 p = inherited_mapper._props[k]
1973 if isinstance(p, ColumnProperty):
1974 # note here we place the subclass column
1975 # first. See [ticket:1892] for background.
1976 properties[k] = [col] + p.columns
1977 result_mapper_args = mapper_args.copy()
1978 result_mapper_args["properties"] = properties
1979 self.mapper_args = result_mapper_args
1980
1981 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
1982 self._prepare_mapper_arguments(mapper_kw)
1983 if hasattr(self.cls, "__mapper_cls__"):
1984 mapper_cls = cast(
1985 "Type[Mapper[Any]]",
1986 util.unbound_method_to_callable(
1987 self.cls.__mapper_cls__ # type: ignore
1988 ),
1989 )
1990 else:
1991 mapper_cls = Mapper
1992
1993 return self.set_cls_attribute(
1994 "__mapper__",
1995 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1996 )
1997
1998
1999@util.preload_module("sqlalchemy.orm.decl_api")
2000def _as_dc_declaredattr(
2001 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
2002) -> Any:
2003 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
2004 # we can't write it because field.metadata is immutable :( so we have
2005 # to go through extra trouble to compare these
2006 decl_api = util.preloaded.orm_decl_api
2007 obj = field_metadata[sa_dataclass_metadata_key]
2008 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
2009 return decl_api.declared_attr(obj)
2010 else:
2011 return obj
2012
2013
2014class _DeferredMapperConfig(_ClassScanMapperConfig):
2015 _cls: weakref.ref[Type[Any]]
2016
2017 is_deferred = True
2018
2019 _configs: util.OrderedDict[
2020 weakref.ref[Type[Any]], _DeferredMapperConfig
2021 ] = util.OrderedDict()
2022
2023 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
2024 pass
2025
2026 @property
2027 def cls(self) -> Type[Any]:
2028 return self._cls() # type: ignore
2029
2030 @cls.setter
2031 def cls(self, class_: Type[Any]) -> None:
2032 self._cls = weakref.ref(class_, self._remove_config_cls)
2033 self._configs[self._cls] = self
2034
2035 @classmethod
2036 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
2037 cls._configs.pop(ref, None)
2038
2039 @classmethod
2040 def has_cls(cls, class_: Type[Any]) -> bool:
2041 # 2.6 fails on weakref if class_ is an old style class
2042 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
2043
2044 @classmethod
2045 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
2046 if hasattr(class_, "_sa_raise_deferred_config"):
2047 class_._sa_raise_deferred_config()
2048
2049 raise orm_exc.UnmappedClassError(
2050 class_,
2051 msg=(
2052 f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
2053 "mapping on it. It is not yet usable as a mapped class."
2054 ),
2055 )
2056
2057 @classmethod
2058 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
2059 return cls._configs[weakref.ref(class_)]
2060
2061 @classmethod
2062 def classes_for_base(
2063 cls, base_cls: Type[Any], sort: bool = True
2064 ) -> List[_DeferredMapperConfig]:
2065 classes_for_base = [
2066 m
2067 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
2068 if cls_ is not None and issubclass(cls_, base_cls)
2069 ]
2070
2071 if not sort:
2072 return classes_for_base
2073
2074 all_m_by_cls = {m.cls: m for m in classes_for_base}
2075
2076 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
2077 for m_cls in all_m_by_cls:
2078 tuples.extend(
2079 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
2080 for base_cls in m_cls.__bases__
2081 if base_cls in all_m_by_cls
2082 )
2083 return list(topological.sort(tuples, classes_for_base))
2084
2085 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
2086 self._configs.pop(self._cls, None)
2087 return super().map(mapper_kw)
2088
2089
2090def _add_attribute(
2091 cls: Type[Any], key: str, value: MapperProperty[Any]
2092) -> None:
2093 """add an attribute to an existing declarative class.
2094
2095 This runs through the logic to determine MapperProperty,
2096 adds it to the Mapper, adds a column to the mapped Table, etc.
2097
2098 """
2099
2100 if "__mapper__" in cls.__dict__:
2101 mapped_cls = cast("MappedClassProtocol[Any]", cls)
2102
2103 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
2104 if isinstance(mc.__table__, Table):
2105 return mc.__table__
2106 raise exc.InvalidRequestError(
2107 f"Cannot add a new attribute to mapped class {mc.__name__!r} "
2108 "because it's not mapped against a table."
2109 )
2110
2111 if isinstance(value, Column):
2112 _undefer_column_name(key, value)
2113 _table_or_raise(mapped_cls).append_column(
2114 value, replace_existing=True
2115 )
2116 mapped_cls.__mapper__.add_property(key, value)
2117 elif isinstance(value, _MapsColumns):
2118 mp = value.mapper_property_to_assign
2119 for col, _ in value.columns_to_assign:
2120 _undefer_column_name(key, col)
2121 _table_or_raise(mapped_cls).append_column(
2122 col, replace_existing=True
2123 )
2124 if not mp:
2125 mapped_cls.__mapper__.add_property(key, col)
2126 if mp:
2127 mapped_cls.__mapper__.add_property(key, mp)
2128 elif isinstance(value, MapperProperty):
2129 mapped_cls.__mapper__.add_property(key, value)
2130 elif isinstance(value, QueryableAttribute) and value.key != key:
2131 # detect a QueryableAttribute that's already mapped being
2132 # assigned elsewhere in userland, turn into a synonym()
2133 value = SynonymProperty(value.key)
2134 mapped_cls.__mapper__.add_property(key, value)
2135 else:
2136 type.__setattr__(cls, key, value)
2137 mapped_cls.__mapper__._expire_memoizations()
2138 else:
2139 type.__setattr__(cls, key, value)
2140
2141
2142def _del_attribute(cls: Type[Any], key: str) -> None:
2143 if (
2144 "__mapper__" in cls.__dict__
2145 and key in cls.__dict__
2146 and not cast(
2147 "MappedClassProtocol[Any]", cls
2148 ).__mapper__._dispose_called
2149 ):
2150 value = cls.__dict__[key]
2151 if isinstance(
2152 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
2153 ):
2154 raise NotImplementedError(
2155 "Can't un-map individual mapped attributes on a mapped class."
2156 )
2157 else:
2158 type.__delattr__(cls, key)
2159 cast(
2160 "MappedClassProtocol[Any]", cls
2161 ).__mapper__._expire_memoizations()
2162 else:
2163 type.__delattr__(cls, key)
2164
2165
2166def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2167 """A simple constructor that allows initialization from kwargs.
2168
2169 Sets attributes on the constructed instance using the names and
2170 values in ``kwargs``.
2171
2172 Only keys that are present as
2173 attributes of the instance's class are allowed. These could be,
2174 for example, any mapped columns or relationships.
2175 """
2176 cls_ = type(self)
2177 for k in kwargs:
2178 if not hasattr(cls_, k):
2179 raise TypeError(
2180 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2181 )
2182 setattr(self, k, kwargs[k])
2183
2184
2185_declarative_constructor.__name__ = "__init__"
2186
2187
2188def _undefer_column_name(key: str, column: Column[Any]) -> None:
2189 if column.key is None:
2190 column.key = key
2191 if column.name is None:
2192 column.name = key