1# orm/decl_base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Internal implementation for declarative."""
9
10from __future__ import annotations
11
12import collections
13import dataclasses
14import re
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import List
21from typing import Mapping
22from typing import NamedTuple
23from typing import NoReturn
24from typing import Optional
25from typing import Sequence
26from typing import Tuple
27from typing import Type
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31import weakref
32
33from . import attributes
34from . import clsregistry
35from . import exc as orm_exc
36from . import instrumentation
37from . import mapperlib
38from ._typing import _O
39from ._typing import attr_is_internal_proxy
40from .attributes import InstrumentedAttribute
41from .attributes import QueryableAttribute
42from .base import _is_mapped_class
43from .base import InspectionAttr
44from .descriptor_props import CompositeProperty
45from .descriptor_props import SynonymProperty
46from .interfaces import _AttributeOptions
47from .interfaces import _DCAttributeOptions
48from .interfaces import _IntrospectsAnnotations
49from .interfaces import _MappedAttribute
50from .interfaces import _MapsColumns
51from .interfaces import MapperProperty
52from .mapper import Mapper
53from .properties import ColumnProperty
54from .properties import MappedColumn
55from .util import _extract_mapped_subtype
56from .util import _is_mapped_annotation
57from .util import class_mapper
58from .util import de_stringify_annotation
59from .. import event
60from .. import exc
61from .. import util
62from ..sql import expression
63from ..sql.base import _NoArg
64from ..sql.schema import Column
65from ..sql.schema import Table
66from ..util import topological
67from ..util.typing import _AnnotationScanType
68from ..util.typing import get_args
69from ..util.typing import is_fwd_ref
70from ..util.typing import is_literal
71from ..util.typing import Protocol
72from ..util.typing import TypedDict
73
74if TYPE_CHECKING:
75 from ._typing import _ClassDict
76 from ._typing import _RegistryType
77 from .base import Mapped
78 from .decl_api import declared_attr
79 from .instrumentation import ClassManager
80 from ..sql.elements import NamedColumn
81 from ..sql.schema import MetaData
82 from ..sql.selectable import FromClause
83
84_T = TypeVar("_T", bound=Any)
85
86_MapperKwArgs = Mapping[str, Any]
87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
88
89
90class MappedClassProtocol(Protocol[_O]):
91 """A protocol representing a SQLAlchemy mapped class.
92
93 The protocol is generic on the type of class, use
94 ``MappedClassProtocol[Any]`` to allow any mapped class.
95 """
96
97 __name__: str
98 __mapper__: Mapper[_O]
99 __table__: FromClause
100
101 def __call__(self, **kw: Any) -> _O: ...
102
103
104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
105 "Internal more detailed version of ``MappedClassProtocol``."
106 metadata: MetaData
107 __tablename__: str
108 __mapper_args__: _MapperKwArgs
109 __table_args__: Optional[_TableArgsType]
110
111 _sa_apply_dc_transforms: Optional[_DataclassArguments]
112
113 def __declare_first__(self) -> None: ...
114
115 def __declare_last__(self) -> None: ...
116
117
118class _DataclassArguments(TypedDict):
119 init: Union[_NoArg, bool]
120 repr: Union[_NoArg, bool]
121 eq: Union[_NoArg, bool]
122 order: Union[_NoArg, bool]
123 unsafe_hash: Union[_NoArg, bool]
124 match_args: Union[_NoArg, bool]
125 kw_only: Union[_NoArg, bool]
126 dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]]
127
128
129def _declared_mapping_info(
130 cls: Type[Any],
131) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
132 # deferred mapping
133 if _DeferredMapperConfig.has_cls(cls):
134 return _DeferredMapperConfig.config_for_cls(cls)
135 # regular mapping
136 elif _is_mapped_class(cls):
137 return class_mapper(cls, configure=False)
138 else:
139 return None
140
141
142def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
143 """return True if this class will be used as a superclass to set in
144 'inherits'.
145
146 This includes deferred mapper configs that aren't mapped yet, however does
147 not include classes with _sa_decl_prepare_nocascade (e.g.
148 ``AbstractConcreteBase``); these concrete-only classes are not set up as
149 "inherits" until after mappers are configured using
150 mapper._set_concrete_base()
151
152 """
153 if _DeferredMapperConfig.has_cls(cls):
154 return not _get_immediate_cls_attr(
155 cls, "_sa_decl_prepare_nocascade", strict=True
156 )
157 # regular mapping
158 elif _is_mapped_class(cls):
159 return True
160 else:
161 return False
162
163
164def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
165 if cls is object:
166 return None
167
168 sup: Optional[Type[Any]]
169
170 if cls.__dict__.get("__abstract__", False):
171 for base_ in cls.__bases__:
172 sup = _resolve_for_abstract_or_classical(base_)
173 if sup is not None:
174 return sup
175 else:
176 return None
177 else:
178 clsmanager = _dive_for_cls_manager(cls)
179
180 if clsmanager:
181 return clsmanager.class_
182 else:
183 return cls
184
185
186def _get_immediate_cls_attr(
187 cls: Type[Any], attrname: str, strict: bool = False
188) -> Optional[Any]:
189 """return an attribute of the class that is either present directly
190 on the class, e.g. not on a superclass, or is from a superclass but
191 this superclass is a non-mapped mixin, that is, not a descendant of
192 the declarative base and is also not classically mapped.
193
194 This is used to detect attributes that indicate something about
195 a mapped class independently from any mapped classes that it may
196 inherit from.
197
198 """
199
200 # the rules are different for this name than others,
201 # make sure we've moved it out. transitional
202 assert attrname != "__abstract__"
203
204 if not issubclass(cls, object):
205 return None
206
207 if attrname in cls.__dict__:
208 return getattr(cls, attrname)
209
210 for base in cls.__mro__[1:]:
211 _is_classical_inherits = _dive_for_cls_manager(base) is not None
212
213 if attrname in base.__dict__ and (
214 base is cls
215 or (
216 (base in cls.__bases__ if strict else True)
217 and not _is_classical_inherits
218 )
219 ):
220 return getattr(base, attrname)
221 else:
222 return None
223
224
225def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
226 # because the class manager registration is pluggable,
227 # we need to do the search for every class in the hierarchy,
228 # rather than just a simple "cls._sa_class_manager"
229
230 for base in cls.__mro__:
231 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
232 base
233 )
234 if manager:
235 return manager
236 return None
237
238
239def _as_declarative(
240 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
241) -> Optional[_MapperConfig]:
242 # declarative scans the class for attributes. no table or mapper
243 # args passed separately.
244 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
245
246
247def _mapper(
248 registry: _RegistryType,
249 cls: Type[_O],
250 table: Optional[FromClause],
251 mapper_kw: _MapperKwArgs,
252) -> Mapper[_O]:
253 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
254 return cast("MappedClassProtocol[_O]", cls).__mapper__
255
256
257@util.preload_module("sqlalchemy.orm.decl_api")
258def _is_declarative_props(obj: Any) -> bool:
259 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
260
261 return isinstance(obj, (_declared_attr_common, util.classproperty))
262
263
264def _check_declared_props_nocascade(
265 obj: Any, name: str, cls: Type[_O]
266) -> bool:
267 if _is_declarative_props(obj):
268 if getattr(obj, "_cascading", False):
269 util.warn(
270 "@declared_attr.cascading is not supported on the %s "
271 "attribute on class %s. This attribute invokes for "
272 "subclasses in any case." % (name, cls)
273 )
274 return True
275 else:
276 return False
277
278
279class _MapperConfig:
280 __slots__ = (
281 "cls",
282 "classname",
283 "properties",
284 "declared_attr_reg",
285 "__weakref__",
286 )
287
288 cls: Type[Any]
289 classname: str
290 properties: util.OrderedDict[
291 str,
292 Union[
293 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
294 ],
295 ]
296 declared_attr_reg: Dict[declared_attr[Any], Any]
297
298 @classmethod
299 def setup_mapping(
300 cls,
301 registry: _RegistryType,
302 cls_: Type[_O],
303 dict_: _ClassDict,
304 table: Optional[FromClause],
305 mapper_kw: _MapperKwArgs,
306 ) -> Optional[_MapperConfig]:
307 manager = attributes.opt_manager_of_class(cls)
308 if manager and manager.class_ is cls_:
309 raise exc.InvalidRequestError(
310 f"Class {cls!r} already has been instrumented declaratively"
311 )
312
313 if cls_.__dict__.get("__abstract__", False):
314 return None
315
316 defer_map = _get_immediate_cls_attr(
317 cls_, "_sa_decl_prepare_nocascade", strict=True
318 ) or hasattr(cls_, "_sa_decl_prepare")
319
320 if defer_map:
321 return _DeferredMapperConfig(
322 registry, cls_, dict_, table, mapper_kw
323 )
324 else:
325 return _ClassScanMapperConfig(
326 registry, cls_, dict_, table, mapper_kw
327 )
328
329 def __init__(
330 self,
331 registry: _RegistryType,
332 cls_: Type[Any],
333 mapper_kw: _MapperKwArgs,
334 ):
335 self.cls = util.assert_arg_type(cls_, type, "cls_")
336 self.classname = cls_.__name__
337 self.properties = util.OrderedDict()
338 self.declared_attr_reg = {}
339
340 if not mapper_kw.get("non_primary", False):
341 instrumentation.register_class(
342 self.cls,
343 finalize=False,
344 registry=registry,
345 declarative_scan=self,
346 init_method=registry.constructor,
347 )
348 else:
349 manager = attributes.opt_manager_of_class(self.cls)
350 if not manager or not manager.is_mapped:
351 raise exc.InvalidRequestError(
352 "Class %s has no primary mapper configured. Configure "
353 "a primary mapper first before setting up a non primary "
354 "Mapper." % self.cls
355 )
356
357 def set_cls_attribute(self, attrname: str, value: _T) -> _T:
358 manager = instrumentation.manager_of_class(self.cls)
359 manager.install_member(attrname, value)
360 return value
361
362 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
363 raise NotImplementedError()
364
365 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
366 self.map(mapper_kw)
367
368
369class _ImperativeMapperConfig(_MapperConfig):
370 __slots__ = ("local_table", "inherits")
371
372 def __init__(
373 self,
374 registry: _RegistryType,
375 cls_: Type[_O],
376 table: Optional[FromClause],
377 mapper_kw: _MapperKwArgs,
378 ):
379 super().__init__(registry, cls_, mapper_kw)
380
381 self.local_table = self.set_cls_attribute("__table__", table)
382
383 with mapperlib._CONFIGURE_MUTEX:
384 if not mapper_kw.get("non_primary", False):
385 clsregistry.add_class(
386 self.classname, self.cls, registry._class_registry
387 )
388
389 self._setup_inheritance(mapper_kw)
390
391 self._early_mapping(mapper_kw)
392
393 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
394 mapper_cls = Mapper
395
396 return self.set_cls_attribute(
397 "__mapper__",
398 mapper_cls(self.cls, self.local_table, **mapper_kw),
399 )
400
401 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
402 cls = self.cls
403
404 inherits = mapper_kw.get("inherits", None)
405
406 if inherits is None:
407 # since we search for classical mappings now, search for
408 # multiple mapped bases as well and raise an error.
409 inherits_search = []
410 for base_ in cls.__bases__:
411 c = _resolve_for_abstract_or_classical(base_)
412 if c is None:
413 continue
414
415 if _is_supercls_for_inherits(c) and c not in inherits_search:
416 inherits_search.append(c)
417
418 if inherits_search:
419 if len(inherits_search) > 1:
420 raise exc.InvalidRequestError(
421 "Class %s has multiple mapped bases: %r"
422 % (cls, inherits_search)
423 )
424 inherits = inherits_search[0]
425 elif isinstance(inherits, Mapper):
426 inherits = inherits.class_
427
428 self.inherits = inherits
429
430
431class _CollectedAnnotation(NamedTuple):
432 raw_annotation: _AnnotationScanType
433 mapped_container: Optional[Type[Mapped[Any]]]
434 extracted_mapped_annotation: Union[_AnnotationScanType, str]
435 is_dataclass: bool
436 attr_value: Any
437 originating_module: str
438 originating_class: Type[Any]
439
440
441class _ClassScanMapperConfig(_MapperConfig):
442 __slots__ = (
443 "registry",
444 "clsdict_view",
445 "collected_attributes",
446 "collected_annotations",
447 "local_table",
448 "persist_selectable",
449 "declared_columns",
450 "column_ordering",
451 "column_copies",
452 "table_args",
453 "tablename",
454 "mapper_args",
455 "mapper_args_fn",
456 "table_fn",
457 "inherits",
458 "single",
459 "allow_dataclass_fields",
460 "dataclass_setup_arguments",
461 "is_dataclass_prior_to_mapping",
462 "allow_unmapped_annotations",
463 )
464
465 is_deferred = False
466 registry: _RegistryType
467 clsdict_view: _ClassDict
468 collected_annotations: Dict[str, _CollectedAnnotation]
469 collected_attributes: Dict[str, Any]
470 local_table: Optional[FromClause]
471 persist_selectable: Optional[FromClause]
472 declared_columns: util.OrderedSet[Column[Any]]
473 column_ordering: Dict[Column[Any], int]
474 column_copies: Dict[
475 Union[MappedColumn[Any], Column[Any]],
476 Union[MappedColumn[Any], Column[Any]],
477 ]
478 tablename: Optional[str]
479 mapper_args: Mapping[str, Any]
480 table_args: Optional[_TableArgsType]
481 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
482 inherits: Optional[Type[Any]]
483 single: bool
484
485 is_dataclass_prior_to_mapping: bool
486 allow_unmapped_annotations: bool
487
488 dataclass_setup_arguments: Optional[_DataclassArguments]
489 """if the class has SQLAlchemy native dataclass parameters, where
490 we will turn the class into a dataclass within the declarative mapping
491 process.
492
493 """
494
495 allow_dataclass_fields: bool
496 """if true, look for dataclass-processed Field objects on the target
497 class as well as superclasses and extract ORM mapping directives from
498 the "metadata" attribute of each Field.
499
500 if False, dataclass fields can still be used, however they won't be
501 mapped.
502
503 """
504
505 def __init__(
506 self,
507 registry: _RegistryType,
508 cls_: Type[_O],
509 dict_: _ClassDict,
510 table: Optional[FromClause],
511 mapper_kw: _MapperKwArgs,
512 ):
513 # grab class dict before the instrumentation manager has been added.
514 # reduces cycles
515 self.clsdict_view = (
516 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
517 )
518 super().__init__(registry, cls_, mapper_kw)
519 self.registry = registry
520 self.persist_selectable = None
521
522 self.collected_attributes = {}
523 self.collected_annotations = {}
524 self.declared_columns = util.OrderedSet()
525 self.column_ordering = {}
526 self.column_copies = {}
527 self.single = False
528 self.dataclass_setup_arguments = dca = getattr(
529 self.cls, "_sa_apply_dc_transforms", None
530 )
531
532 self.allow_unmapped_annotations = getattr(
533 self.cls, "__allow_unmapped__", False
534 ) or bool(self.dataclass_setup_arguments)
535
536 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
537 cls_
538 )
539
540 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
541
542 # we don't want to consume Field objects from a not-already-dataclass.
543 # the Field objects won't have their "name" or "type" populated,
544 # and while it seems like we could just set these on Field as we
545 # read them, Field is documented as "user read only" and we need to
546 # stay far away from any off-label use of dataclasses APIs.
547 if (not cld or dca) and sdk:
548 raise exc.InvalidRequestError(
549 "SQLAlchemy mapped dataclasses can't consume mapping "
550 "information from dataclass.Field() objects if the immediate "
551 "class is not already a dataclass."
552 )
553
554 # if already a dataclass, and __sa_dataclass_metadata_key__ present,
555 # then also look inside of dataclass.Field() objects yielded by
556 # dataclasses.get_fields(cls) when scanning for attributes
557 self.allow_dataclass_fields = bool(sdk and cld)
558
559 self._setup_declared_events()
560
561 self._scan_attributes()
562
563 self._setup_dataclasses_transforms()
564
565 with mapperlib._CONFIGURE_MUTEX:
566 clsregistry.add_class(
567 self.classname, self.cls, registry._class_registry
568 )
569
570 self._setup_inheriting_mapper(mapper_kw)
571
572 self._extract_mappable_attributes()
573
574 self._extract_declared_columns()
575
576 self._setup_table(table)
577
578 self._setup_inheriting_columns(mapper_kw)
579
580 self._early_mapping(mapper_kw)
581
582 def _setup_declared_events(self) -> None:
583 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
584
585 @event.listens_for(Mapper, "after_configured")
586 def after_configured() -> None:
587 cast(
588 "_DeclMappedClassProtocol[Any]", self.cls
589 ).__declare_last__()
590
591 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
592
593 @event.listens_for(Mapper, "before_configured")
594 def before_configured() -> None:
595 cast(
596 "_DeclMappedClassProtocol[Any]", self.cls
597 ).__declare_first__()
598
599 def _cls_attr_override_checker(
600 self, cls: Type[_O]
601 ) -> Callable[[str, Any], bool]:
602 """Produce a function that checks if a class has overridden an
603 attribute, taking SQLAlchemy-enabled dataclass fields into account.
604
605 """
606
607 if self.allow_dataclass_fields:
608 sa_dataclass_metadata_key = _get_immediate_cls_attr(
609 cls, "__sa_dataclass_metadata_key__"
610 )
611 else:
612 sa_dataclass_metadata_key = None
613
614 if not sa_dataclass_metadata_key:
615
616 def attribute_is_overridden(key: str, obj: Any) -> bool:
617 return getattr(cls, key, obj) is not obj
618
619 else:
620 all_datacls_fields = {
621 f.name: f.metadata[sa_dataclass_metadata_key]
622 for f in util.dataclass_fields(cls)
623 if sa_dataclass_metadata_key in f.metadata
624 }
625 local_datacls_fields = {
626 f.name: f.metadata[sa_dataclass_metadata_key]
627 for f in util.local_dataclass_fields(cls)
628 if sa_dataclass_metadata_key in f.metadata
629 }
630
631 absent = object()
632
633 def attribute_is_overridden(key: str, obj: Any) -> bool:
634 if _is_declarative_props(obj):
635 obj = obj.fget
636
637 # this function likely has some failure modes still if
638 # someone is doing a deep mixing of the same attribute
639 # name as plain Python attribute vs. dataclass field.
640
641 ret = local_datacls_fields.get(key, absent)
642 if _is_declarative_props(ret):
643 ret = ret.fget
644
645 if ret is obj:
646 return False
647 elif ret is not absent:
648 return True
649
650 all_field = all_datacls_fields.get(key, absent)
651
652 ret = getattr(cls, key, obj)
653
654 if ret is obj:
655 return False
656
657 # for dataclasses, this could be the
658 # 'default' of the field. so filter more specifically
659 # for an already-mapped InstrumentedAttribute
660 if ret is not absent and isinstance(
661 ret, InstrumentedAttribute
662 ):
663 return True
664
665 if all_field is obj:
666 return False
667 elif all_field is not absent:
668 return True
669
670 # can't find another attribute
671 return False
672
673 return attribute_is_overridden
674
675 _include_dunders = {
676 "__table__",
677 "__mapper_args__",
678 "__tablename__",
679 "__table_args__",
680 }
681
682 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
683
684 def _cls_attr_resolver(
685 self, cls: Type[Any]
686 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
687 """produce a function to iterate the "attributes" of a class
688 which we want to consider for mapping, adjusting for SQLAlchemy fields
689 embedded in dataclass fields.
690
691 """
692 cls_annotations = util.get_annotations(cls)
693
694 cls_vars = vars(cls)
695
696 _include_dunders = self._include_dunders
697 _match_exclude_dunders = self._match_exclude_dunders
698
699 names = [
700 n
701 for n in util.merge_lists_w_ordering(
702 list(cls_vars), list(cls_annotations)
703 )
704 if not _match_exclude_dunders.match(n) or n in _include_dunders
705 ]
706
707 if self.allow_dataclass_fields:
708 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
709 cls, "__sa_dataclass_metadata_key__"
710 )
711 else:
712 sa_dataclass_metadata_key = None
713
714 if not sa_dataclass_metadata_key:
715
716 def local_attributes_for_class() -> (
717 Iterable[Tuple[str, Any, Any, bool]]
718 ):
719 return (
720 (
721 name,
722 cls_vars.get(name),
723 cls_annotations.get(name),
724 False,
725 )
726 for name in names
727 )
728
729 else:
730 dataclass_fields = {
731 field.name: field for field in util.local_dataclass_fields(cls)
732 }
733
734 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
735
736 def local_attributes_for_class() -> (
737 Iterable[Tuple[str, Any, Any, bool]]
738 ):
739 for name in names:
740 field = dataclass_fields.get(name, None)
741 if field and sa_dataclass_metadata_key in field.metadata:
742 yield field.name, _as_dc_declaredattr(
743 field.metadata, fixed_sa_dataclass_metadata_key
744 ), cls_annotations.get(field.name), True
745 else:
746 yield name, cls_vars.get(name), cls_annotations.get(
747 name
748 ), False
749
750 return local_attributes_for_class
751
752 def _scan_attributes(self) -> None:
753 cls = self.cls
754
755 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
756
757 clsdict_view = self.clsdict_view
758 collected_attributes = self.collected_attributes
759 column_copies = self.column_copies
760 _include_dunders = self._include_dunders
761 mapper_args_fn = None
762 table_args = inherited_table_args = None
763 table_fn = None
764 tablename = None
765 fixed_table = "__table__" in clsdict_view
766
767 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
768
769 bases = []
770
771 for base in cls.__mro__:
772 # collect bases and make sure standalone columns are copied
773 # to be the column they will ultimately be on the class,
774 # so that declared_attr functions use the right columns.
775 # need to do this all the way up the hierarchy first
776 # (see #8190)
777
778 class_mapped = base is not cls and _is_supercls_for_inherits(base)
779
780 local_attributes_for_class = self._cls_attr_resolver(base)
781
782 if not class_mapped and base is not cls:
783 locally_collected_columns = self._produce_column_copies(
784 local_attributes_for_class,
785 attribute_is_overridden,
786 fixed_table,
787 base,
788 )
789 else:
790 locally_collected_columns = {}
791
792 bases.append(
793 (
794 base,
795 class_mapped,
796 local_attributes_for_class,
797 locally_collected_columns,
798 )
799 )
800
801 for (
802 base,
803 class_mapped,
804 local_attributes_for_class,
805 locally_collected_columns,
806 ) in bases:
807 # this transfer can also take place as we scan each name
808 # for finer-grained control of how collected_attributes is
809 # populated, as this is what impacts column ordering.
810 # however it's simpler to get it out of the way here.
811 collected_attributes.update(locally_collected_columns)
812
813 for (
814 name,
815 obj,
816 annotation,
817 is_dataclass_field,
818 ) in local_attributes_for_class():
819 if name in _include_dunders:
820 if name == "__mapper_args__":
821 check_decl = _check_declared_props_nocascade(
822 obj, name, cls
823 )
824 if not mapper_args_fn and (
825 not class_mapped or check_decl
826 ):
827 # don't even invoke __mapper_args__ until
828 # after we've determined everything about the
829 # mapped table.
830 # make a copy of it so a class-level dictionary
831 # is not overwritten when we update column-based
832 # arguments.
833 def _mapper_args_fn() -> Dict[str, Any]:
834 return dict(cls_as_Decl.__mapper_args__)
835
836 mapper_args_fn = _mapper_args_fn
837
838 elif name == "__tablename__":
839 check_decl = _check_declared_props_nocascade(
840 obj, name, cls
841 )
842 if not tablename and (not class_mapped or check_decl):
843 tablename = cls_as_Decl.__tablename__
844 elif name == "__table__":
845 check_decl = _check_declared_props_nocascade(
846 obj, name, cls
847 )
848 # if a @declared_attr using "__table__" is detected,
849 # wrap up a callable to look for "__table__" from
850 # the final concrete class when we set up a table.
851 # this was fixed by
852 # #11509, regression in 2.0 from version 1.4.
853 if check_decl and not table_fn:
854 # don't even invoke __table__ until we're ready
855 def _table_fn() -> FromClause:
856 return cls_as_Decl.__table__
857
858 table_fn = _table_fn
859
860 elif name == "__table_args__":
861 check_decl = _check_declared_props_nocascade(
862 obj, name, cls
863 )
864 if not table_args and (not class_mapped or check_decl):
865 table_args = cls_as_Decl.__table_args__
866 if not isinstance(
867 table_args, (tuple, dict, type(None))
868 ):
869 raise exc.ArgumentError(
870 "__table_args__ value must be a tuple, "
871 "dict, or None"
872 )
873 if base is not cls:
874 inherited_table_args = True
875 else:
876 # any other dunder names; should not be here
877 # as we have tested for all four names in
878 # _include_dunders
879 assert False
880 elif class_mapped:
881 if _is_declarative_props(obj) and not obj._quiet:
882 util.warn(
883 "Regular (i.e. not __special__) "
884 "attribute '%s.%s' uses @declared_attr, "
885 "but owning class %s is mapped - "
886 "not applying to subclass %s."
887 % (base.__name__, name, base, cls)
888 )
889
890 continue
891 elif base is not cls:
892 # we're a mixin, abstract base, or something that is
893 # acting like that for now.
894
895 if isinstance(obj, (Column, MappedColumn)):
896 # already copied columns to the mapped class.
897 continue
898 elif isinstance(obj, MapperProperty):
899 raise exc.InvalidRequestError(
900 "Mapper properties (i.e. deferred,"
901 "column_property(), relationship(), etc.) must "
902 "be declared as @declared_attr callables "
903 "on declarative mixin classes. For dataclass "
904 "field() objects, use a lambda:"
905 )
906 elif _is_declarative_props(obj):
907 # tried to get overloads to tell this to
908 # pylance, no luck
909 assert obj is not None
910
911 if obj._cascading:
912 if name in clsdict_view:
913 # unfortunately, while we can use the user-
914 # defined attribute here to allow a clean
915 # override, if there's another
916 # subclass below then it still tries to use
917 # this. not sure if there is enough
918 # information here to add this as a feature
919 # later on.
920 util.warn(
921 "Attribute '%s' on class %s cannot be "
922 "processed due to "
923 "@declared_attr.cascading; "
924 "skipping" % (name, cls)
925 )
926 collected_attributes[name] = column_copies[obj] = (
927 ret
928 ) = obj.__get__(obj, cls)
929 setattr(cls, name, ret)
930 else:
931 if is_dataclass_field:
932 # access attribute using normal class access
933 # first, to see if it's been mapped on a
934 # superclass. note if the dataclasses.field()
935 # has "default", this value can be anything.
936 ret = getattr(cls, name, None)
937
938 # so, if it's anything that's not ORM
939 # mapped, assume we should invoke the
940 # declared_attr
941 if not isinstance(ret, InspectionAttr):
942 ret = obj.fget()
943 else:
944 # access attribute using normal class access.
945 # if the declared attr already took place
946 # on a superclass that is mapped, then
947 # this is no longer a declared_attr, it will
948 # be the InstrumentedAttribute
949 ret = getattr(cls, name)
950
951 # correct for proxies created from hybrid_property
952 # or similar. note there is no known case that
953 # produces nested proxies, so we are only
954 # looking one level deep right now.
955
956 if (
957 isinstance(ret, InspectionAttr)
958 and attr_is_internal_proxy(ret)
959 and not isinstance(
960 ret.original_property, MapperProperty
961 )
962 ):
963 ret = ret.descriptor
964
965 collected_attributes[name] = column_copies[obj] = (
966 ret
967 )
968
969 if (
970 isinstance(ret, (Column, MapperProperty))
971 and ret.doc is None
972 ):
973 ret.doc = obj.__doc__
974
975 self._collect_annotation(
976 name,
977 obj._collect_return_annotation(),
978 base,
979 True,
980 obj,
981 )
982 elif _is_mapped_annotation(annotation, cls, base):
983 # Mapped annotation without any object.
984 # product_column_copies should have handled this.
985 # if future support for other MapperProperty,
986 # then test if this name is already handled and
987 # otherwise proceed to generate.
988 if not fixed_table:
989 assert (
990 name in collected_attributes
991 or attribute_is_overridden(name, None)
992 )
993 continue
994 else:
995 # here, the attribute is some other kind of
996 # property that we assume is not part of the
997 # declarative mapping. however, check for some
998 # more common mistakes
999 self._warn_for_decl_attributes(base, name, obj)
1000 elif is_dataclass_field and (
1001 name not in clsdict_view or clsdict_view[name] is not obj
1002 ):
1003 # here, we are definitely looking at the target class
1004 # and not a superclass. this is currently a
1005 # dataclass-only path. if the name is only
1006 # a dataclass field and isn't in local cls.__dict__,
1007 # put the object there.
1008 # assert that the dataclass-enabled resolver agrees
1009 # with what we are seeing
1010
1011 assert not attribute_is_overridden(name, obj)
1012
1013 if _is_declarative_props(obj):
1014 obj = obj.fget()
1015
1016 collected_attributes[name] = obj
1017 self._collect_annotation(
1018 name, annotation, base, False, obj
1019 )
1020 else:
1021 collected_annotation = self._collect_annotation(
1022 name, annotation, base, None, obj
1023 )
1024 is_mapped = (
1025 collected_annotation is not None
1026 and collected_annotation.mapped_container is not None
1027 )
1028 generated_obj = (
1029 collected_annotation.attr_value
1030 if collected_annotation is not None
1031 else obj
1032 )
1033 if obj is None and not fixed_table and is_mapped:
1034 collected_attributes[name] = (
1035 generated_obj
1036 if generated_obj is not None
1037 else MappedColumn()
1038 )
1039 elif name in clsdict_view:
1040 collected_attributes[name] = obj
1041 # else if the name is not in the cls.__dict__,
1042 # don't collect it as an attribute.
1043 # we will see the annotation only, which is meaningful
1044 # both for mapping and dataclasses setup
1045
1046 if inherited_table_args and not tablename:
1047 table_args = None
1048
1049 self.table_args = table_args
1050 self.tablename = tablename
1051 self.mapper_args_fn = mapper_args_fn
1052 self.table_fn = table_fn
1053
1054 def _setup_dataclasses_transforms(self) -> None:
1055 dataclass_setup_arguments = self.dataclass_setup_arguments
1056 if not dataclass_setup_arguments:
1057 return
1058
1059 # can't use is_dataclass since it uses hasattr
1060 if "__dataclass_fields__" in self.cls.__dict__:
1061 raise exc.InvalidRequestError(
1062 f"Class {self.cls} is already a dataclass; ensure that "
1063 "base classes / decorator styles of establishing dataclasses "
1064 "are not being mixed. "
1065 "This can happen if a class that inherits from "
1066 "'MappedAsDataclass', even indirectly, is been mapped with "
1067 "'@registry.mapped_as_dataclass'"
1068 )
1069
1070 # can't create a dataclass if __table__ is already there. This would
1071 # fail an assertion when calling _get_arguments_for_make_dataclass:
1072 # assert False, "Mapped[] received without a mapping declaration"
1073 if "__table__" in self.cls.__dict__:
1074 raise exc.InvalidRequestError(
1075 f"Class {self.cls} already defines a '__table__'. "
1076 "ORM Annotated Dataclasses do not support a pre-existing "
1077 "'__table__' element"
1078 )
1079
1080 warn_for_non_dc_attrs = collections.defaultdict(list)
1081
1082 def _allow_dataclass_field(
1083 key: str, originating_class: Type[Any]
1084 ) -> bool:
1085 if (
1086 originating_class is not self.cls
1087 and "__dataclass_fields__" not in originating_class.__dict__
1088 ):
1089 warn_for_non_dc_attrs[originating_class].append(key)
1090
1091 return True
1092
1093 manager = instrumentation.manager_of_class(self.cls)
1094 assert manager is not None
1095
1096 field_list = [
1097 _AttributeOptions._get_arguments_for_make_dataclass(
1098 key,
1099 anno,
1100 mapped_container,
1101 self.collected_attributes.get(key, _NoArg.NO_ARG),
1102 )
1103 for key, anno, mapped_container in (
1104 (
1105 key,
1106 mapped_anno if mapped_anno else raw_anno,
1107 mapped_container,
1108 )
1109 for key, (
1110 raw_anno,
1111 mapped_container,
1112 mapped_anno,
1113 is_dc,
1114 attr_value,
1115 originating_module,
1116 originating_class,
1117 ) in self.collected_annotations.items()
1118 if _allow_dataclass_field(key, originating_class)
1119 and (
1120 key not in self.collected_attributes
1121 # issue #9226; check for attributes that we've collected
1122 # which are already instrumented, which we would assume
1123 # mean we are in an ORM inheritance mapping and this
1124 # attribute is already mapped on the superclass. Under
1125 # no circumstance should any QueryableAttribute be sent to
1126 # the dataclass() function; anything that's mapped should
1127 # be Field and that's it
1128 or not isinstance(
1129 self.collected_attributes[key], QueryableAttribute
1130 )
1131 )
1132 )
1133 ]
1134
1135 if warn_for_non_dc_attrs:
1136 for (
1137 originating_class,
1138 non_dc_attrs,
1139 ) in warn_for_non_dc_attrs.items():
1140 util.warn_deprecated(
1141 f"When transforming {self.cls} to a dataclass, "
1142 f"attribute(s) "
1143 f"{', '.join(repr(key) for key in non_dc_attrs)} "
1144 f"originates from superclass "
1145 f"{originating_class}, which is not a dataclass. This "
1146 f"usage is deprecated and will raise an error in "
1147 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
1148 f"Dataclasses, ensure that all mixin classes and other "
1149 f"superclasses which include attributes are also a "
1150 f"subclass of MappedAsDataclass.",
1151 "2.0",
1152 code="dcmx",
1153 )
1154
1155 annotations = {}
1156 defaults = {}
1157 for item in field_list:
1158 if len(item) == 2:
1159 name, tp = item
1160 elif len(item) == 3:
1161 name, tp, spec = item
1162 defaults[name] = spec
1163 else:
1164 assert False
1165 annotations[name] = tp
1166
1167 for k, v in defaults.items():
1168 setattr(self.cls, k, v)
1169
1170 self._apply_dataclasses_to_any_class(
1171 dataclass_setup_arguments, self.cls, annotations
1172 )
1173
1174 @classmethod
1175 def _update_annotations_for_non_mapped_class(
1176 cls, klass: Type[_O]
1177 ) -> Mapping[str, _AnnotationScanType]:
1178 cls_annotations = util.get_annotations(klass)
1179
1180 new_anno = {}
1181 for name, annotation in cls_annotations.items():
1182 if _is_mapped_annotation(annotation, klass, klass):
1183 extracted = _extract_mapped_subtype(
1184 annotation,
1185 klass,
1186 klass.__module__,
1187 name,
1188 type(None),
1189 required=False,
1190 is_dataclass_field=False,
1191 expect_mapped=False,
1192 )
1193 if extracted:
1194 inner, _ = extracted
1195 new_anno[name] = inner
1196 else:
1197 new_anno[name] = annotation
1198 return new_anno
1199
1200 @classmethod
1201 def _apply_dataclasses_to_any_class(
1202 cls,
1203 dataclass_setup_arguments: _DataclassArguments,
1204 klass: Type[_O],
1205 use_annotations: Mapping[str, _AnnotationScanType],
1206 ) -> None:
1207 cls._assert_dc_arguments(dataclass_setup_arguments)
1208
1209 dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
1210 if dataclass_callable is _NoArg.NO_ARG:
1211 dataclass_callable = dataclasses.dataclass
1212
1213 restored: Optional[Any]
1214
1215 if use_annotations:
1216 # apply constructed annotations that should look "normal" to a
1217 # dataclasses callable, based on the fields present. This
1218 # means remove the Mapped[] container and ensure all Field
1219 # entries have an annotation
1220 restored = getattr(klass, "__annotations__", None)
1221 klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
1222 else:
1223 restored = None
1224
1225 try:
1226 dataclass_callable(
1227 klass,
1228 **{
1229 k: v
1230 for k, v in dataclass_setup_arguments.items()
1231 if v is not _NoArg.NO_ARG and k != "dataclass_callable"
1232 },
1233 )
1234 except (TypeError, ValueError) as ex:
1235 raise exc.InvalidRequestError(
1236 f"Python dataclasses error encountered when creating "
1237 f"dataclass for {klass.__name__!r}: "
1238 f"{ex!r}. Please refer to Python dataclasses "
1239 "documentation for additional information.",
1240 code="dcte",
1241 ) from ex
1242 finally:
1243 # restore original annotations outside of the dataclasses
1244 # process; for mixins and __abstract__ superclasses, SQLAlchemy
1245 # Declarative will need to see the Mapped[] container inside the
1246 # annotations in order to map subclasses
1247 if use_annotations:
1248 if restored is None:
1249 del klass.__annotations__
1250 else:
1251 klass.__annotations__ = restored
1252
1253 @classmethod
1254 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
1255 allowed = {
1256 "init",
1257 "repr",
1258 "order",
1259 "eq",
1260 "unsafe_hash",
1261 "kw_only",
1262 "match_args",
1263 "dataclass_callable",
1264 }
1265 disallowed_args = set(arguments).difference(allowed)
1266 if disallowed_args:
1267 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
1268 raise exc.ArgumentError(
1269 f"Dataclass argument(s) {msg} are not accepted"
1270 )
1271
1272 def _collect_annotation(
1273 self,
1274 name: str,
1275 raw_annotation: _AnnotationScanType,
1276 originating_class: Type[Any],
1277 expect_mapped: Optional[bool],
1278 attr_value: Any,
1279 ) -> Optional[_CollectedAnnotation]:
1280 if name in self.collected_annotations:
1281 return self.collected_annotations[name]
1282
1283 if raw_annotation is None:
1284 return None
1285
1286 is_dataclass = self.is_dataclass_prior_to_mapping
1287 allow_unmapped = self.allow_unmapped_annotations
1288
1289 if expect_mapped is None:
1290 is_dataclass_field = isinstance(attr_value, dataclasses.Field)
1291 expect_mapped = (
1292 not is_dataclass_field
1293 and not allow_unmapped
1294 and (
1295 attr_value is None
1296 or isinstance(attr_value, _MappedAttribute)
1297 )
1298 )
1299
1300 is_dataclass_field = False
1301 extracted = _extract_mapped_subtype(
1302 raw_annotation,
1303 self.cls,
1304 originating_class.__module__,
1305 name,
1306 type(attr_value),
1307 required=False,
1308 is_dataclass_field=is_dataclass_field,
1309 expect_mapped=expect_mapped and not is_dataclass,
1310 )
1311 if extracted is None:
1312 # ClassVar can come out here
1313 return None
1314
1315 extracted_mapped_annotation, mapped_container = extracted
1316
1317 if attr_value is None and not is_literal(extracted_mapped_annotation):
1318 for elem in get_args(extracted_mapped_annotation):
1319 if is_fwd_ref(
1320 elem, check_generic=True, check_for_plain_string=True
1321 ):
1322 elem = de_stringify_annotation(
1323 self.cls,
1324 elem,
1325 originating_class.__module__,
1326 include_generic=True,
1327 )
1328 # look in Annotated[...] for an ORM construct,
1329 # such as Annotated[int, mapped_column(primary_key=True)]
1330 if isinstance(elem, _IntrospectsAnnotations):
1331 attr_value = elem.found_in_pep593_annotated()
1332
1333 self.collected_annotations[name] = ca = _CollectedAnnotation(
1334 raw_annotation,
1335 mapped_container,
1336 extracted_mapped_annotation,
1337 is_dataclass,
1338 attr_value,
1339 originating_class.__module__,
1340 originating_class,
1341 )
1342 return ca
1343
1344 def _warn_for_decl_attributes(
1345 self, cls: Type[Any], key: str, c: Any
1346 ) -> None:
1347 if isinstance(c, expression.ColumnElement):
1348 util.warn(
1349 f"Attribute '{key}' on class {cls} appears to "
1350 "be a non-schema SQLAlchemy expression "
1351 "object; this won't be part of the declarative mapping. "
1352 "To map arbitrary expressions, use ``column_property()`` "
1353 "or a similar function such as ``deferred()``, "
1354 "``query_expression()`` etc. "
1355 )
1356
1357 def _produce_column_copies(
1358 self,
1359 attributes_for_class: Callable[
1360 [], Iterable[Tuple[str, Any, Any, bool]]
1361 ],
1362 attribute_is_overridden: Callable[[str, Any], bool],
1363 fixed_table: bool,
1364 originating_class: Type[Any],
1365 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
1366 cls = self.cls
1367 dict_ = self.clsdict_view
1368 locally_collected_attributes = {}
1369 column_copies = self.column_copies
1370 # copy mixin columns to the mapped class
1371
1372 for name, obj, annotation, is_dataclass in attributes_for_class():
1373 if (
1374 not fixed_table
1375 and obj is None
1376 and _is_mapped_annotation(annotation, cls, originating_class)
1377 ):
1378 # obj is None means this is the annotation only path
1379
1380 if attribute_is_overridden(name, obj):
1381 # perform same "overridden" check as we do for
1382 # Column/MappedColumn, this is how a mixin col is not
1383 # applied to an inherited subclass that does not have
1384 # the mixin. the anno-only path added here for
1385 # #9564
1386 continue
1387
1388 collected_annotation = self._collect_annotation(
1389 name, annotation, originating_class, True, obj
1390 )
1391 obj = (
1392 collected_annotation.attr_value
1393 if collected_annotation is not None
1394 else obj
1395 )
1396 if obj is None:
1397 obj = MappedColumn()
1398
1399 locally_collected_attributes[name] = obj
1400 setattr(cls, name, obj)
1401
1402 elif isinstance(obj, (Column, MappedColumn)):
1403 if attribute_is_overridden(name, obj):
1404 # if column has been overridden
1405 # (like by the InstrumentedAttribute of the
1406 # superclass), skip. don't collect the annotation
1407 # either (issue #8718)
1408 continue
1409
1410 collected_annotation = self._collect_annotation(
1411 name, annotation, originating_class, True, obj
1412 )
1413 obj = (
1414 collected_annotation.attr_value
1415 if collected_annotation is not None
1416 else obj
1417 )
1418
1419 if name not in dict_ and not (
1420 "__table__" in dict_
1421 and (getattr(obj, "name", None) or name)
1422 in dict_["__table__"].c
1423 ):
1424 if obj.foreign_keys:
1425 for fk in obj.foreign_keys:
1426 if (
1427 fk._table_column is not None
1428 and fk._table_column.table is None
1429 ):
1430 raise exc.InvalidRequestError(
1431 "Columns with foreign keys to "
1432 "non-table-bound "
1433 "columns must be declared as "
1434 "@declared_attr callables "
1435 "on declarative mixin classes. "
1436 "For dataclass "
1437 "field() objects, use a lambda:."
1438 )
1439
1440 column_copies[obj] = copy_ = obj._copy()
1441
1442 locally_collected_attributes[name] = copy_
1443 setattr(cls, name, copy_)
1444
1445 return locally_collected_attributes
1446
1447 def _extract_mappable_attributes(self) -> None:
1448 cls = self.cls
1449 collected_attributes = self.collected_attributes
1450
1451 our_stuff = self.properties
1452
1453 _include_dunders = self._include_dunders
1454
1455 late_mapped = _get_immediate_cls_attr(
1456 cls, "_sa_decl_prepare_nocascade", strict=True
1457 )
1458
1459 allow_unmapped_annotations = self.allow_unmapped_annotations
1460 expect_annotations_wo_mapped = (
1461 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
1462 )
1463
1464 look_for_dataclass_things = bool(self.dataclass_setup_arguments)
1465
1466 for k in list(collected_attributes):
1467 if k in _include_dunders:
1468 continue
1469
1470 value = collected_attributes[k]
1471
1472 if _is_declarative_props(value):
1473 # @declared_attr in collected_attributes only occurs here for a
1474 # @declared_attr that's directly on the mapped class;
1475 # for a mixin, these have already been evaluated
1476 if value._cascading:
1477 util.warn(
1478 "Use of @declared_attr.cascading only applies to "
1479 "Declarative 'mixin' and 'abstract' classes. "
1480 "Currently, this flag is ignored on mapped class "
1481 "%s" % self.cls
1482 )
1483
1484 value = getattr(cls, k)
1485
1486 elif (
1487 isinstance(value, QueryableAttribute)
1488 and value.class_ is not cls
1489 and value.key != k
1490 ):
1491 # detect a QueryableAttribute that's already mapped being
1492 # assigned elsewhere in userland, turn into a synonym()
1493 value = SynonymProperty(value.key)
1494 setattr(cls, k, value)
1495
1496 if (
1497 isinstance(value, tuple)
1498 and len(value) == 1
1499 and isinstance(value[0], (Column, _MappedAttribute))
1500 ):
1501 util.warn(
1502 "Ignoring declarative-like tuple value of attribute "
1503 "'%s': possibly a copy-and-paste error with a comma "
1504 "accidentally placed at the end of the line?" % k
1505 )
1506 continue
1507 elif look_for_dataclass_things and isinstance(
1508 value, dataclasses.Field
1509 ):
1510 # we collected a dataclass Field; dataclasses would have
1511 # set up the correct state on the class
1512 continue
1513 elif not isinstance(value, (Column, _DCAttributeOptions)):
1514 # using @declared_attr for some object that
1515 # isn't Column/MapperProperty/_DCAttributeOptions; remove
1516 # from the clsdict_view
1517 # and place the evaluated value onto the class.
1518 collected_attributes.pop(k)
1519 self._warn_for_decl_attributes(cls, k, value)
1520 if not late_mapped:
1521 setattr(cls, k, value)
1522 continue
1523 # we expect to see the name 'metadata' in some valid cases;
1524 # however at this point we see it's assigned to something trying
1525 # to be mapped, so raise for that.
1526 # TODO: should "registry" here be also? might be too late
1527 # to change that now (2.0 betas)
1528 elif k in ("metadata",):
1529 raise exc.InvalidRequestError(
1530 f"Attribute name '{k}' is reserved when using the "
1531 "Declarative API."
1532 )
1533 elif isinstance(value, Column):
1534 _undefer_column_name(
1535 k, self.column_copies.get(value, value) # type: ignore
1536 )
1537 else:
1538 if isinstance(value, _IntrospectsAnnotations):
1539 (
1540 annotation,
1541 mapped_container,
1542 extracted_mapped_annotation,
1543 is_dataclass,
1544 attr_value,
1545 originating_module,
1546 originating_class,
1547 ) = self.collected_annotations.get(
1548 k, (None, None, None, False, None, None, None)
1549 )
1550
1551 # issue #8692 - don't do any annotation interpretation if
1552 # an annotation were present and a container such as
1553 # Mapped[] etc. were not used. If annotation is None,
1554 # do declarative_scan so that the property can raise
1555 # for required
1556 if (
1557 mapped_container is not None
1558 or annotation is None
1559 # issue #10516: need to do declarative_scan even with
1560 # a non-Mapped annotation if we are doing
1561 # __allow_unmapped__, for things like col.name
1562 # assignment
1563 or allow_unmapped_annotations
1564 ):
1565 try:
1566 value.declarative_scan(
1567 self,
1568 self.registry,
1569 cls,
1570 originating_module,
1571 k,
1572 mapped_container,
1573 annotation,
1574 extracted_mapped_annotation,
1575 is_dataclass,
1576 )
1577 except NameError as ne:
1578 raise orm_exc.MappedAnnotationError(
1579 f"Could not resolve all types within mapped "
1580 f'annotation: "{annotation}". Ensure all '
1581 f"types are written correctly and are "
1582 f"imported within the module in use."
1583 ) from ne
1584 else:
1585 # assert that we were expecting annotations
1586 # without Mapped[] were going to be passed.
1587 # otherwise an error should have been raised
1588 # by util._extract_mapped_subtype before we got here.
1589 assert expect_annotations_wo_mapped
1590
1591 if isinstance(value, _DCAttributeOptions):
1592 if (
1593 value._has_dataclass_arguments
1594 and not look_for_dataclass_things
1595 ):
1596 if isinstance(value, MapperProperty):
1597 argnames = [
1598 "init",
1599 "default_factory",
1600 "repr",
1601 "default",
1602 ]
1603 else:
1604 argnames = ["init", "default_factory", "repr"]
1605
1606 args = {
1607 a
1608 for a in argnames
1609 if getattr(
1610 value._attribute_options, f"dataclasses_{a}"
1611 )
1612 is not _NoArg.NO_ARG
1613 }
1614
1615 raise exc.ArgumentError(
1616 f"Attribute '{k}' on class {cls} includes "
1617 f"dataclasses argument(s): "
1618 f"{', '.join(sorted(repr(a) for a in args))} but "
1619 f"class does not specify "
1620 "SQLAlchemy native dataclass configuration."
1621 )
1622
1623 if not isinstance(value, (MapperProperty, _MapsColumns)):
1624 # filter for _DCAttributeOptions objects that aren't
1625 # MapperProperty / mapped_column(). Currently this
1626 # includes AssociationProxy. pop it from the things
1627 # we're going to map and set it up as a descriptor
1628 # on the class.
1629 collected_attributes.pop(k)
1630
1631 # Assoc Prox (or other descriptor object that may
1632 # use _DCAttributeOptions) is usually here, except if
1633 # 1. we're a
1634 # dataclass, dataclasses would have removed the
1635 # attr here or 2. assoc proxy is coming from a
1636 # superclass, we want it to be direct here so it
1637 # tracks state or 3. assoc prox comes from
1638 # declared_attr, uncommon case
1639 setattr(cls, k, value)
1640 continue
1641
1642 our_stuff[k] = value
1643
1644 def _extract_declared_columns(self) -> None:
1645 our_stuff = self.properties
1646
1647 # extract columns from the class dict
1648 declared_columns = self.declared_columns
1649 column_ordering = self.column_ordering
1650 name_to_prop_key = collections.defaultdict(set)
1651
1652 for key, c in list(our_stuff.items()):
1653 if isinstance(c, _MapsColumns):
1654 mp_to_assign = c.mapper_property_to_assign
1655 if mp_to_assign:
1656 our_stuff[key] = mp_to_assign
1657 else:
1658 # if no mapper property to assign, this currently means
1659 # this is a MappedColumn that will produce a Column for us
1660 del our_stuff[key]
1661
1662 for col, sort_order in c.columns_to_assign:
1663 if not isinstance(c, CompositeProperty):
1664 name_to_prop_key[col.name].add(key)
1665 declared_columns.add(col)
1666
1667 # we would assert this, however we want the below
1668 # warning to take effect instead. See #9630
1669 # assert col not in column_ordering
1670
1671 column_ordering[col] = sort_order
1672
1673 # if this is a MappedColumn and the attribute key we
1674 # have is not what the column has for its key, map the
1675 # Column explicitly under the attribute key name.
1676 # otherwise, Mapper will map it under the column key.
1677 if mp_to_assign is None and key != col.key:
1678 our_stuff[key] = col
1679 elif isinstance(c, Column):
1680 # undefer previously occurred here, and now occurs earlier.
1681 # ensure every column we get here has been named
1682 assert c.name is not None
1683 name_to_prop_key[c.name].add(key)
1684 declared_columns.add(c)
1685 # if the column is the same name as the key,
1686 # remove it from the explicit properties dict.
1687 # the normal rules for assigning column-based properties
1688 # will take over, including precedence of columns
1689 # in multi-column ColumnProperties.
1690 if key == c.key:
1691 del our_stuff[key]
1692
1693 for name, keys in name_to_prop_key.items():
1694 if len(keys) > 1:
1695 util.warn(
1696 "On class %r, Column object %r named "
1697 "directly multiple times, "
1698 "only one will be used: %s. "
1699 "Consider using orm.synonym instead"
1700 % (self.classname, name, (", ".join(sorted(keys))))
1701 )
1702
1703 def _setup_table(self, table: Optional[FromClause] = None) -> None:
1704 cls = self.cls
1705 cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
1706
1707 tablename = self.tablename
1708 table_args = self.table_args
1709 clsdict_view = self.clsdict_view
1710 declared_columns = self.declared_columns
1711 column_ordering = self.column_ordering
1712
1713 manager = attributes.manager_of_class(cls)
1714
1715 if (
1716 self.table_fn is None
1717 and "__table__" not in clsdict_view
1718 and table is None
1719 ):
1720 if hasattr(cls, "__table_cls__"):
1721 table_cls = cast(
1722 Type[Table],
1723 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
1724 )
1725 else:
1726 table_cls = Table
1727
1728 if tablename is not None:
1729 args: Tuple[Any, ...] = ()
1730 table_kw: Dict[str, Any] = {}
1731
1732 if table_args:
1733 if isinstance(table_args, dict):
1734 table_kw = table_args
1735 elif isinstance(table_args, tuple):
1736 if isinstance(table_args[-1], dict):
1737 args, table_kw = table_args[0:-1], table_args[-1]
1738 else:
1739 args = table_args
1740
1741 autoload_with = clsdict_view.get("__autoload_with__")
1742 if autoload_with:
1743 table_kw["autoload_with"] = autoload_with
1744
1745 autoload = clsdict_view.get("__autoload__")
1746 if autoload:
1747 table_kw["autoload"] = True
1748
1749 sorted_columns = sorted(
1750 declared_columns,
1751 key=lambda c: column_ordering.get(c, 0),
1752 )
1753 table = self.set_cls_attribute(
1754 "__table__",
1755 table_cls(
1756 tablename,
1757 self._metadata_for_cls(manager),
1758 *sorted_columns,
1759 *args,
1760 **table_kw,
1761 ),
1762 )
1763 else:
1764 if table is None:
1765 if self.table_fn:
1766 table = self.set_cls_attribute(
1767 "__table__", self.table_fn()
1768 )
1769 else:
1770 table = cls_as_Decl.__table__
1771 if declared_columns:
1772 for c in declared_columns:
1773 if not table.c.contains_column(c):
1774 raise exc.ArgumentError(
1775 "Can't add additional column %r when "
1776 "specifying __table__" % c.key
1777 )
1778
1779 self.local_table = table
1780
1781 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
1782 meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
1783 if meta is not None:
1784 return meta
1785 else:
1786 return manager.registry.metadata
1787
1788 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
1789 cls = self.cls
1790
1791 inherits = mapper_kw.get("inherits", None)
1792
1793 if inherits is None:
1794 # since we search for classical mappings now, search for
1795 # multiple mapped bases as well and raise an error.
1796 inherits_search = []
1797 for base_ in cls.__bases__:
1798 c = _resolve_for_abstract_or_classical(base_)
1799 if c is None:
1800 continue
1801
1802 if _is_supercls_for_inherits(c) and c not in inherits_search:
1803 inherits_search.append(c)
1804
1805 if inherits_search:
1806 if len(inherits_search) > 1:
1807 raise exc.InvalidRequestError(
1808 "Class %s has multiple mapped bases: %r"
1809 % (cls, inherits_search)
1810 )
1811 inherits = inherits_search[0]
1812 elif isinstance(inherits, Mapper):
1813 inherits = inherits.class_
1814
1815 self.inherits = inherits
1816
1817 clsdict_view = self.clsdict_view
1818 if "__table__" not in clsdict_view and self.tablename is None:
1819 self.single = True
1820
1821 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
1822 table = self.local_table
1823 cls = self.cls
1824 table_args = self.table_args
1825 declared_columns = self.declared_columns
1826
1827 if (
1828 table is None
1829 and self.inherits is None
1830 and not _get_immediate_cls_attr(cls, "__no_table__")
1831 ):
1832 raise exc.InvalidRequestError(
1833 "Class %r does not have a __table__ or __tablename__ "
1834 "specified and does not inherit from an existing "
1835 "table-mapped class." % cls
1836 )
1837 elif self.inherits:
1838 inherited_mapper_or_config = _declared_mapping_info(self.inherits)
1839 assert inherited_mapper_or_config is not None
1840 inherited_table = inherited_mapper_or_config.local_table
1841 inherited_persist_selectable = (
1842 inherited_mapper_or_config.persist_selectable
1843 )
1844
1845 if table is None:
1846 # single table inheritance.
1847 # ensure no table args
1848 if table_args:
1849 raise exc.ArgumentError(
1850 "Can't place __table_args__ on an inherited class "
1851 "with no table."
1852 )
1853
1854 # add any columns declared here to the inherited table.
1855 if declared_columns and not isinstance(inherited_table, Table):
1856 raise exc.ArgumentError(
1857 f"Can't declare columns on single-table-inherited "
1858 f"subclass {self.cls}; superclass {self.inherits} "
1859 "is not mapped to a Table"
1860 )
1861
1862 for col in declared_columns:
1863 assert inherited_table is not None
1864 if col.name in inherited_table.c:
1865 if inherited_table.c[col.name] is col:
1866 continue
1867 raise exc.ArgumentError(
1868 f"Column '{col}' on class {cls.__name__} "
1869 f"conflicts with existing column "
1870 f"'{inherited_table.c[col.name]}'. If using "
1871 f"Declarative, consider using the "
1872 "use_existing_column parameter of mapped_column() "
1873 "to resolve conflicts."
1874 )
1875 if col.primary_key:
1876 raise exc.ArgumentError(
1877 "Can't place primary key columns on an inherited "
1878 "class with no table."
1879 )
1880
1881 if TYPE_CHECKING:
1882 assert isinstance(inherited_table, Table)
1883
1884 inherited_table.append_column(col)
1885 if (
1886 inherited_persist_selectable is not None
1887 and inherited_persist_selectable is not inherited_table
1888 ):
1889 inherited_persist_selectable._refresh_for_new_column(
1890 col
1891 )
1892
1893 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
1894 properties = self.properties
1895
1896 if self.mapper_args_fn:
1897 mapper_args = self.mapper_args_fn()
1898 else:
1899 mapper_args = {}
1900
1901 if mapper_kw:
1902 mapper_args.update(mapper_kw)
1903
1904 if "properties" in mapper_args:
1905 properties = dict(properties)
1906 properties.update(mapper_args["properties"])
1907
1908 # make sure that column copies are used rather
1909 # than the original columns from any mixins
1910 for k in ("version_id_col", "polymorphic_on"):
1911 if k in mapper_args:
1912 v = mapper_args[k]
1913 mapper_args[k] = self.column_copies.get(v, v)
1914
1915 if "primary_key" in mapper_args:
1916 mapper_args["primary_key"] = [
1917 self.column_copies.get(v, v)
1918 for v in util.to_list(mapper_args["primary_key"])
1919 ]
1920
1921 if "inherits" in mapper_args:
1922 inherits_arg = mapper_args["inherits"]
1923 if isinstance(inherits_arg, Mapper):
1924 inherits_arg = inherits_arg.class_
1925
1926 if inherits_arg is not self.inherits:
1927 raise exc.InvalidRequestError(
1928 "mapper inherits argument given for non-inheriting "
1929 "class %s" % (mapper_args["inherits"])
1930 )
1931
1932 if self.inherits:
1933 mapper_args["inherits"] = self.inherits
1934
1935 if self.inherits and not mapper_args.get("concrete", False):
1936 # note the superclass is expected to have a Mapper assigned and
1937 # not be a deferred config, as this is called within map()
1938 inherited_mapper = class_mapper(self.inherits, False)
1939 inherited_table = inherited_mapper.local_table
1940
1941 # single or joined inheritance
1942 # exclude any cols on the inherited table which are
1943 # not mapped on the parent class, to avoid
1944 # mapping columns specific to sibling/nephew classes
1945 if "exclude_properties" not in mapper_args:
1946 mapper_args["exclude_properties"] = exclude_properties = {
1947 c.key
1948 for c in inherited_table.c
1949 if c not in inherited_mapper._columntoproperty
1950 }.union(inherited_mapper.exclude_properties or ())
1951 exclude_properties.difference_update(
1952 [c.key for c in self.declared_columns]
1953 )
1954
1955 # look through columns in the current mapper that
1956 # are keyed to a propname different than the colname
1957 # (if names were the same, we'd have popped it out above,
1958 # in which case the mapper makes this combination).
1959 # See if the superclass has a similar column property.
1960 # If so, join them together.
1961 for k, col in list(properties.items()):
1962 if not isinstance(col, expression.ColumnElement):
1963 continue
1964 if k in inherited_mapper._props:
1965 p = inherited_mapper._props[k]
1966 if isinstance(p, ColumnProperty):
1967 # note here we place the subclass column
1968 # first. See [ticket:1892] for background.
1969 properties[k] = [col] + p.columns
1970 result_mapper_args = mapper_args.copy()
1971 result_mapper_args["properties"] = properties
1972 self.mapper_args = result_mapper_args
1973
1974 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
1975 self._prepare_mapper_arguments(mapper_kw)
1976 if hasattr(self.cls, "__mapper_cls__"):
1977 mapper_cls = cast(
1978 "Type[Mapper[Any]]",
1979 util.unbound_method_to_callable(
1980 self.cls.__mapper_cls__ # type: ignore
1981 ),
1982 )
1983 else:
1984 mapper_cls = Mapper
1985
1986 return self.set_cls_attribute(
1987 "__mapper__",
1988 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1989 )
1990
1991
1992@util.preload_module("sqlalchemy.orm.decl_api")
1993def _as_dc_declaredattr(
1994 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
1995) -> Any:
1996 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1997 # we can't write it because field.metadata is immutable :( so we have
1998 # to go through extra trouble to compare these
1999 decl_api = util.preloaded.orm_decl_api
2000 obj = field_metadata[sa_dataclass_metadata_key]
2001 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
2002 return decl_api.declared_attr(obj)
2003 else:
2004 return obj
2005
2006
2007class _DeferredMapperConfig(_ClassScanMapperConfig):
2008 _cls: weakref.ref[Type[Any]]
2009
2010 is_deferred = True
2011
2012 _configs: util.OrderedDict[
2013 weakref.ref[Type[Any]], _DeferredMapperConfig
2014 ] = util.OrderedDict()
2015
2016 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
2017 pass
2018
2019 # mypy disallows plain property override of variable
2020 @property # type: ignore
2021 def cls(self) -> Type[Any]:
2022 return self._cls() # type: ignore
2023
2024 @cls.setter
2025 def cls(self, class_: Type[Any]) -> None:
2026 self._cls = weakref.ref(class_, self._remove_config_cls)
2027 self._configs[self._cls] = self
2028
2029 @classmethod
2030 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
2031 cls._configs.pop(ref, None)
2032
2033 @classmethod
2034 def has_cls(cls, class_: Type[Any]) -> bool:
2035 # 2.6 fails on weakref if class_ is an old style class
2036 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
2037
2038 @classmethod
2039 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
2040 if hasattr(class_, "_sa_raise_deferred_config"):
2041 class_._sa_raise_deferred_config()
2042
2043 raise orm_exc.UnmappedClassError(
2044 class_,
2045 msg=(
2046 f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
2047 "mapping on it. It is not yet usable as a mapped class."
2048 ),
2049 )
2050
2051 @classmethod
2052 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
2053 return cls._configs[weakref.ref(class_)]
2054
2055 @classmethod
2056 def classes_for_base(
2057 cls, base_cls: Type[Any], sort: bool = True
2058 ) -> List[_DeferredMapperConfig]:
2059 classes_for_base = [
2060 m
2061 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
2062 if cls_ is not None and issubclass(cls_, base_cls)
2063 ]
2064
2065 if not sort:
2066 return classes_for_base
2067
2068 all_m_by_cls = {m.cls: m for m in classes_for_base}
2069
2070 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
2071 for m_cls in all_m_by_cls:
2072 tuples.extend(
2073 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
2074 for base_cls in m_cls.__bases__
2075 if base_cls in all_m_by_cls
2076 )
2077 return list(topological.sort(tuples, classes_for_base))
2078
2079 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
2080 self._configs.pop(self._cls, None)
2081 return super().map(mapper_kw)
2082
2083
2084def _add_attribute(
2085 cls: Type[Any], key: str, value: MapperProperty[Any]
2086) -> None:
2087 """add an attribute to an existing declarative class.
2088
2089 This runs through the logic to determine MapperProperty,
2090 adds it to the Mapper, adds a column to the mapped Table, etc.
2091
2092 """
2093
2094 if "__mapper__" in cls.__dict__:
2095 mapped_cls = cast("MappedClassProtocol[Any]", cls)
2096
2097 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
2098 if isinstance(mc.__table__, Table):
2099 return mc.__table__
2100 raise exc.InvalidRequestError(
2101 f"Cannot add a new attribute to mapped class {mc.__name__!r} "
2102 "because it's not mapped against a table."
2103 )
2104
2105 if isinstance(value, Column):
2106 _undefer_column_name(key, value)
2107 _table_or_raise(mapped_cls).append_column(
2108 value, replace_existing=True
2109 )
2110 mapped_cls.__mapper__.add_property(key, value)
2111 elif isinstance(value, _MapsColumns):
2112 mp = value.mapper_property_to_assign
2113 for col, _ in value.columns_to_assign:
2114 _undefer_column_name(key, col)
2115 _table_or_raise(mapped_cls).append_column(
2116 col, replace_existing=True
2117 )
2118 if not mp:
2119 mapped_cls.__mapper__.add_property(key, col)
2120 if mp:
2121 mapped_cls.__mapper__.add_property(key, mp)
2122 elif isinstance(value, MapperProperty):
2123 mapped_cls.__mapper__.add_property(key, value)
2124 elif isinstance(value, QueryableAttribute) and value.key != key:
2125 # detect a QueryableAttribute that's already mapped being
2126 # assigned elsewhere in userland, turn into a synonym()
2127 value = SynonymProperty(value.key)
2128 mapped_cls.__mapper__.add_property(key, value)
2129 else:
2130 type.__setattr__(cls, key, value)
2131 mapped_cls.__mapper__._expire_memoizations()
2132 else:
2133 type.__setattr__(cls, key, value)
2134
2135
2136def _del_attribute(cls: Type[Any], key: str) -> None:
2137 if (
2138 "__mapper__" in cls.__dict__
2139 and key in cls.__dict__
2140 and not cast(
2141 "MappedClassProtocol[Any]", cls
2142 ).__mapper__._dispose_called
2143 ):
2144 value = cls.__dict__[key]
2145 if isinstance(
2146 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
2147 ):
2148 raise NotImplementedError(
2149 "Can't un-map individual mapped attributes on a mapped class."
2150 )
2151 else:
2152 type.__delattr__(cls, key)
2153 cast(
2154 "MappedClassProtocol[Any]", cls
2155 ).__mapper__._expire_memoizations()
2156 else:
2157 type.__delattr__(cls, key)
2158
2159
2160def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2161 """A simple constructor that allows initialization from kwargs.
2162
2163 Sets attributes on the constructed instance using the names and
2164 values in ``kwargs``.
2165
2166 Only keys that are present as
2167 attributes of the instance's class are allowed. These could be,
2168 for example, any mapped columns or relationships.
2169 """
2170 cls_ = type(self)
2171 for k in kwargs:
2172 if not hasattr(cls_, k):
2173 raise TypeError(
2174 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2175 )
2176 setattr(self, k, kwargs[k])
2177
2178
2179_declarative_constructor.__name__ = "__init__"
2180
2181
2182def _undefer_column_name(key: str, column: Column[Any]) -> None:
2183 if column.key is None:
2184 column.key = key
2185 if column.name is None:
2186 column.name = key