1# orm/decl_base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Internal implementation for declarative."""
9
10from __future__ import annotations
11
12import collections
13import dataclasses
14import re
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import List
21from typing import Mapping
22from typing import NamedTuple
23from typing import NoReturn
24from typing import Optional
25from typing import Protocol
26from typing import Sequence
27from typing import Tuple
28from typing import Type
29from typing import TYPE_CHECKING
30from typing import TypeVar
31from typing import Union
32import weakref
33
34from . import attributes
35from . import clsregistry
36from . import exc as orm_exc
37from . import instrumentation
38from . import mapperlib
39from ._typing import _O
40from ._typing import attr_is_internal_proxy
41from .attributes import InstrumentedAttribute
42from .attributes import QueryableAttribute
43from .base import _is_mapped_class
44from .base import InspectionAttr
45from .descriptor_props import CompositeProperty
46from .descriptor_props import SynonymProperty
47from .interfaces import _AttributeOptions
48from .interfaces import _DataclassArguments
49from .interfaces import _DCAttributeOptions
50from .interfaces import _IntrospectsAnnotations
51from .interfaces import _MappedAttribute
52from .interfaces import _MapsColumns
53from .interfaces import MapperProperty
54from .mapper import Mapper
55from .properties import ColumnProperty
56from .properties import MappedColumn
57from .util import _extract_mapped_subtype
58from .util import _is_mapped_annotation
59from .util import class_mapper
60from .util import de_stringify_annotation
61from .. import event
62from .. import exc
63from .. import util
64from ..sql import expression
65from ..sql.base import _NoArg
66from ..sql.schema import Column
67from ..sql.schema import Table
68from ..util import topological
69from ..util.typing import _AnnotationScanType
70from ..util.typing import get_args
71from ..util.typing import is_fwd_ref
72from ..util.typing import is_literal
73
74if TYPE_CHECKING:
75 from ._typing import _ClassDict
76 from ._typing import _RegistryType
77 from .base import Mapped
78 from .decl_api import declared_attr
79 from .instrumentation import ClassManager
80 from ..sql.elements import NamedColumn
81 from ..sql.schema import MetaData
82 from ..sql.selectable import FromClause
83
84_T = TypeVar("_T", bound=Any)
85
86_MapperKwArgs = Mapping[str, Any]
87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
88
89
90class MappedClassProtocol(Protocol[_O]):
91 """A protocol representing a SQLAlchemy mapped class.
92
93 The protocol is generic on the type of class, use
94 ``MappedClassProtocol[Any]`` to allow any mapped class.
95 """
96
97 __name__: str
98 __mapper__: Mapper[_O]
99 __table__: FromClause
100
101 def __call__(self, **kw: Any) -> _O: ...
102
103
104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
105 "Internal more detailed version of ``MappedClassProtocol``."
106
107 metadata: MetaData
108 __tablename__: str
109 __mapper_args__: _MapperKwArgs
110 __table_args__: Optional[_TableArgsType]
111
112 _sa_apply_dc_transforms: Optional[_DataclassArguments]
113
114 def __declare_first__(self) -> None: ...
115
116 def __declare_last__(self) -> None: ...
117
118
119def _declared_mapping_info(
120 cls: Type[Any],
121) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
122 # deferred mapping
123 if _DeferredMapperConfig.has_cls(cls):
124 return _DeferredMapperConfig.config_for_cls(cls)
125 # regular mapping
126 elif _is_mapped_class(cls):
127 return class_mapper(cls, configure=False)
128 else:
129 return None
130
131
132def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
133 """return True if this class will be used as a superclass to set in
134 'inherits'.
135
136 This includes deferred mapper configs that aren't mapped yet, however does
137 not include classes with _sa_decl_prepare_nocascade (e.g.
138 ``AbstractConcreteBase``); these concrete-only classes are not set up as
139 "inherits" until after mappers are configured using
140 mapper._set_concrete_base()
141
142 """
143 if _DeferredMapperConfig.has_cls(cls):
144 return not _get_immediate_cls_attr(
145 cls, "_sa_decl_prepare_nocascade", strict=True
146 )
147 # regular mapping
148 elif _is_mapped_class(cls):
149 return True
150 else:
151 return False
152
153
154def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
155 if cls is object:
156 return None
157
158 sup: Optional[Type[Any]]
159
160 if cls.__dict__.get("__abstract__", False):
161 for base_ in cls.__bases__:
162 sup = _resolve_for_abstract_or_classical(base_)
163 if sup is not None:
164 return sup
165 else:
166 return None
167 else:
168 clsmanager = _dive_for_cls_manager(cls)
169
170 if clsmanager:
171 return clsmanager.class_
172 else:
173 return cls
174
175
176def _get_immediate_cls_attr(
177 cls: Type[Any], attrname: str, strict: bool = False
178) -> Optional[Any]:
179 """return an attribute of the class that is either present directly
180 on the class, e.g. not on a superclass, or is from a superclass but
181 this superclass is a non-mapped mixin, that is, not a descendant of
182 the declarative base and is also not classically mapped.
183
184 This is used to detect attributes that indicate something about
185 a mapped class independently from any mapped classes that it may
186 inherit from.
187
188 """
189
190 # the rules are different for this name than others,
191 # make sure we've moved it out. transitional
192 assert attrname != "__abstract__"
193
194 if not issubclass(cls, object):
195 return None
196
197 if attrname in cls.__dict__:
198 return getattr(cls, attrname)
199
200 for base in cls.__mro__[1:]:
201 _is_classical_inherits = _dive_for_cls_manager(base) is not None
202
203 if attrname in base.__dict__ and (
204 base is cls
205 or (
206 (base in cls.__bases__ if strict else True)
207 and not _is_classical_inherits
208 )
209 ):
210 return getattr(base, attrname)
211 else:
212 return None
213
214
215def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
216 # because the class manager registration is pluggable,
217 # we need to do the search for every class in the hierarchy,
218 # rather than just a simple "cls._sa_class_manager"
219
220 for base in cls.__mro__:
221 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
222 base
223 )
224 if manager:
225 return manager
226 return None
227
228
229def _as_declarative(
230 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
231) -> Optional[_MapperConfig]:
232 # declarative scans the class for attributes. no table or mapper
233 # args passed separately.
234 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
235
236
237def _mapper(
238 registry: _RegistryType,
239 cls: Type[_O],
240 table: Optional[FromClause],
241 mapper_kw: _MapperKwArgs,
242) -> Mapper[_O]:
243 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
244 return cast("MappedClassProtocol[_O]", cls).__mapper__
245
246
247@util.preload_module("sqlalchemy.orm.decl_api")
248def _is_declarative_props(obj: Any) -> bool:
249 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
250
251 return isinstance(obj, (_declared_attr_common, util.classproperty))
252
253
254def _check_declared_props_nocascade(
255 obj: Any, name: str, cls: Type[_O]
256) -> bool:
257 if _is_declarative_props(obj):
258 if getattr(obj, "_cascading", False):
259 util.warn(
260 "@declared_attr.cascading is not supported on the %s "
261 "attribute on class %s. This attribute invokes for "
262 "subclasses in any case." % (name, cls)
263 )
264 return True
265 else:
266 return False
267
268
269class _MapperConfig:
270 __slots__ = (
271 "cls",
272 "classname",
273 "properties",
274 "declared_attr_reg",
275 "__weakref__",
276 )
277
278 cls: Type[Any]
279 classname: str
280 properties: util.OrderedDict[
281 str,
282 Union[
283 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
284 ],
285 ]
286 declared_attr_reg: Dict[declared_attr[Any], Any]
287
288 @classmethod
289 def setup_mapping(
290 cls,
291 registry: _RegistryType,
292 cls_: Type[_O],
293 dict_: _ClassDict,
294 table: Optional[FromClause],
295 mapper_kw: _MapperKwArgs,
296 ) -> Optional[_MapperConfig]:
297 manager = attributes.opt_manager_of_class(cls)
298 if manager and manager.class_ is cls_:
299 raise exc.InvalidRequestError(
300 f"Class {cls!r} already has been instrumented declaratively"
301 )
302
303 if cls_.__dict__.get("__abstract__", False):
304 return None
305
306 defer_map = _get_immediate_cls_attr(
307 cls_, "_sa_decl_prepare_nocascade", strict=True
308 ) or hasattr(cls_, "_sa_decl_prepare")
309
310 if defer_map:
311 return _DeferredMapperConfig(
312 registry, cls_, dict_, table, mapper_kw
313 )
314 else:
315 return _ClassScanMapperConfig(
316 registry, cls_, dict_, table, mapper_kw
317 )
318
319 def __init__(
320 self,
321 registry: _RegistryType,
322 cls_: Type[Any],
323 mapper_kw: _MapperKwArgs,
324 ):
325 self.cls = util.assert_arg_type(cls_, type, "cls_")
326 self.classname = cls_.__name__
327 self.properties = util.OrderedDict()
328 self.declared_attr_reg = {}
329
330 instrumentation.register_class(
331 self.cls,
332 finalize=False,
333 registry=registry,
334 declarative_scan=self,
335 init_method=registry.constructor,
336 )
337
338 def set_cls_attribute(self, attrname: str, value: _T) -> _T:
339 manager = instrumentation.manager_of_class(self.cls)
340 manager.install_member(attrname, value)
341 return value
342
343 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
344 raise NotImplementedError()
345
346 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
347 self.map(mapper_kw)
348
349
350class _ImperativeMapperConfig(_MapperConfig):
351 __slots__ = ("local_table", "inherits")
352
353 def __init__(
354 self,
355 registry: _RegistryType,
356 cls_: Type[_O],
357 table: Optional[FromClause],
358 mapper_kw: _MapperKwArgs,
359 ):
360 super().__init__(registry, cls_, mapper_kw)
361
362 self.local_table = self.set_cls_attribute("__table__", table)
363
364 with mapperlib._CONFIGURE_MUTEX:
365 clsregistry._add_class(
366 self.classname, self.cls, registry._class_registry
367 )
368
369 self._setup_inheritance(mapper_kw)
370
371 self._early_mapping(mapper_kw)
372
373 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
374 mapper_cls = Mapper
375
376 return self.set_cls_attribute(
377 "__mapper__",
378 mapper_cls(self.cls, self.local_table, **mapper_kw),
379 )
380
381 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
382 cls = self.cls
383
384 inherits = mapper_kw.get("inherits", None)
385
386 if inherits is None:
387 # since we search for classical mappings now, search for
388 # multiple mapped bases as well and raise an error.
389 inherits_search = []
390 for base_ in cls.__bases__:
391 c = _resolve_for_abstract_or_classical(base_)
392 if c is None:
393 continue
394
395 if _is_supercls_for_inherits(c) and c not in inherits_search:
396 inherits_search.append(c)
397
398 if inherits_search:
399 if len(inherits_search) > 1:
400 raise exc.InvalidRequestError(
401 "Class %s has multiple mapped bases: %r"
402 % (cls, inherits_search)
403 )
404 inherits = inherits_search[0]
405 elif isinstance(inherits, Mapper):
406 inherits = inherits.class_
407
408 self.inherits = inherits
409
410
411class _CollectedAnnotation(NamedTuple):
412 raw_annotation: _AnnotationScanType
413 mapped_container: Optional[Type[Mapped[Any]]]
414 extracted_mapped_annotation: Union[_AnnotationScanType, str]
415 is_dataclass: bool
416 attr_value: Any
417 originating_module: str
418 originating_class: Type[Any]
419
420
421class _ClassScanMapperConfig(_MapperConfig):
422 __slots__ = (
423 "registry",
424 "clsdict_view",
425 "collected_attributes",
426 "collected_annotations",
427 "local_table",
428 "persist_selectable",
429 "declared_columns",
430 "column_ordering",
431 "column_copies",
432 "table_args",
433 "tablename",
434 "mapper_args",
435 "mapper_args_fn",
436 "table_fn",
437 "inherits",
438 "single",
439 "allow_dataclass_fields",
440 "dataclass_setup_arguments",
441 "is_dataclass_prior_to_mapping",
442 "allow_unmapped_annotations",
443 )
444
445 is_deferred = False
446 registry: _RegistryType
447 clsdict_view: _ClassDict
448 collected_annotations: Dict[str, _CollectedAnnotation]
449 collected_attributes: Dict[str, Any]
450 local_table: Optional[FromClause]
451 persist_selectable: Optional[FromClause]
452 declared_columns: util.OrderedSet[Column[Any]]
453 column_ordering: Dict[Column[Any], int]
454 column_copies: Dict[
455 Union[MappedColumn[Any], Column[Any]],
456 Union[MappedColumn[Any], Column[Any]],
457 ]
458 tablename: Optional[str]
459 mapper_args: Mapping[str, Any]
460 table_args: Optional[_TableArgsType]
461 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
462 inherits: Optional[Type[Any]]
463 single: bool
464
465 is_dataclass_prior_to_mapping: bool
466 allow_unmapped_annotations: bool
467
468 dataclass_setup_arguments: Optional[_DataclassArguments]
469 """if the class has SQLAlchemy native dataclass parameters, where
470 we will turn the class into a dataclass within the declarative mapping
471 process.
472
473 """
474
475 allow_dataclass_fields: bool
476 """if true, look for dataclass-processed Field objects on the target
477 class as well as superclasses and extract ORM mapping directives from
478 the "metadata" attribute of each Field.
479
480 if False, dataclass fields can still be used, however they won't be
481 mapped.
482
483 """
484
485 def __init__(
486 self,
487 registry: _RegistryType,
488 cls_: Type[_O],
489 dict_: _ClassDict,
490 table: Optional[FromClause],
491 mapper_kw: _MapperKwArgs,
492 ):
493 # grab class dict before the instrumentation manager has been added.
494 # reduces cycles
495 self.clsdict_view = (
496 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
497 )
498 super().__init__(registry, cls_, mapper_kw)
499 self.registry = registry
500 self.persist_selectable = None
501
502 self.collected_attributes = {}
503 self.collected_annotations = {}
504 self.declared_columns = util.OrderedSet()
505 self.column_ordering = {}
506 self.column_copies = {}
507 self.single = False
508 self.dataclass_setup_arguments = dca = getattr(
509 self.cls, "_sa_apply_dc_transforms", None
510 )
511
512 self.allow_unmapped_annotations = getattr(
513 self.cls, "__allow_unmapped__", False
514 ) or bool(self.dataclass_setup_arguments)
515
516 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
517 cls_
518 )
519
520 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
521
522 # we don't want to consume Field objects from a not-already-dataclass.
523 # the Field objects won't have their "name" or "type" populated,
524 # and while it seems like we could just set these on Field as we
525 # read them, Field is documented as "user read only" and we need to
526 # stay far away from any off-label use of dataclasses APIs.
527 if (not cld or dca) and sdk:
528 raise exc.InvalidRequestError(
529 "SQLAlchemy mapped dataclasses can't consume mapping "
530 "information from dataclass.Field() objects if the immediate "
531 "class is not already a dataclass."
532 )
533
534 # if already a dataclass, and __sa_dataclass_metadata_key__ present,
535 # then also look inside of dataclass.Field() objects yielded by
536 # dataclasses.get_fields(cls) when scanning for attributes
537 self.allow_dataclass_fields = bool(sdk and cld)
538
539 self._setup_declared_events()
540
541 self._scan_attributes()
542
543 self._setup_dataclasses_transforms()
544
545 with mapperlib._CONFIGURE_MUTEX:
546 clsregistry._add_class(
547 self.classname, self.cls, registry._class_registry
548 )
549
550 self._setup_inheriting_mapper(mapper_kw)
551
552 self._extract_mappable_attributes()
553
554 self._extract_declared_columns()
555
556 self._setup_table(table)
557
558 self._setup_inheriting_columns(mapper_kw)
559
560 self._early_mapping(mapper_kw)
561
562 def _setup_declared_events(self) -> None:
563 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
564
565 @event.listens_for(Mapper, "after_configured")
566 def after_configured() -> None:
567 cast(
568 "_DeclMappedClassProtocol[Any]", self.cls
569 ).__declare_last__()
570
571 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
572
573 @event.listens_for(Mapper, "before_configured")
574 def before_configured() -> None:
575 cast(
576 "_DeclMappedClassProtocol[Any]", self.cls
577 ).__declare_first__()
578
579 def _cls_attr_override_checker(
580 self, cls: Type[_O]
581 ) -> Callable[[str, Any], bool]:
582 """Produce a function that checks if a class has overridden an
583 attribute, taking SQLAlchemy-enabled dataclass fields into account.
584
585 """
586
587 if self.allow_dataclass_fields:
588 sa_dataclass_metadata_key = _get_immediate_cls_attr(
589 cls, "__sa_dataclass_metadata_key__"
590 )
591 else:
592 sa_dataclass_metadata_key = None
593
594 if not sa_dataclass_metadata_key:
595
596 def attribute_is_overridden(key: str, obj: Any) -> bool:
597 return getattr(cls, key, obj) is not obj
598
599 else:
600 all_datacls_fields = {
601 f.name: f.metadata[sa_dataclass_metadata_key]
602 for f in util.dataclass_fields(cls)
603 if sa_dataclass_metadata_key in f.metadata
604 }
605 local_datacls_fields = {
606 f.name: f.metadata[sa_dataclass_metadata_key]
607 for f in util.local_dataclass_fields(cls)
608 if sa_dataclass_metadata_key in f.metadata
609 }
610
611 absent = object()
612
613 def attribute_is_overridden(key: str, obj: Any) -> bool:
614 if _is_declarative_props(obj):
615 obj = obj.fget
616
617 # this function likely has some failure modes still if
618 # someone is doing a deep mixing of the same attribute
619 # name as plain Python attribute vs. dataclass field.
620
621 ret = local_datacls_fields.get(key, absent)
622 if _is_declarative_props(ret):
623 ret = ret.fget
624
625 if ret is obj:
626 return False
627 elif ret is not absent:
628 return True
629
630 all_field = all_datacls_fields.get(key, absent)
631
632 ret = getattr(cls, key, obj)
633
634 if ret is obj:
635 return False
636
637 # for dataclasses, this could be the
638 # 'default' of the field. so filter more specifically
639 # for an already-mapped InstrumentedAttribute
640 if ret is not absent and isinstance(
641 ret, InstrumentedAttribute
642 ):
643 return True
644
645 if all_field is obj:
646 return False
647 elif all_field is not absent:
648 return True
649
650 # can't find another attribute
651 return False
652
653 return attribute_is_overridden
654
655 _include_dunders = {
656 "__table__",
657 "__mapper_args__",
658 "__tablename__",
659 "__table_args__",
660 }
661
662 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
663
664 def _cls_attr_resolver(
665 self, cls: Type[Any]
666 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
667 """produce a function to iterate the "attributes" of a class
668 which we want to consider for mapping, adjusting for SQLAlchemy fields
669 embedded in dataclass fields.
670
671 """
672 cls_annotations = util.get_annotations(cls)
673
674 cls_vars = vars(cls)
675
676 _include_dunders = self._include_dunders
677 _match_exclude_dunders = self._match_exclude_dunders
678
679 names = [
680 n
681 for n in util.merge_lists_w_ordering(
682 list(cls_vars), list(cls_annotations)
683 )
684 if not _match_exclude_dunders.match(n) or n in _include_dunders
685 ]
686
687 if self.allow_dataclass_fields:
688 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
689 cls, "__sa_dataclass_metadata_key__"
690 )
691 else:
692 sa_dataclass_metadata_key = None
693
694 if not sa_dataclass_metadata_key:
695
696 def local_attributes_for_class() -> (
697 Iterable[Tuple[str, Any, Any, bool]]
698 ):
699 return (
700 (
701 name,
702 cls_vars.get(name),
703 cls_annotations.get(name),
704 False,
705 )
706 for name in names
707 )
708
709 else:
710 dataclass_fields = {
711 field.name: field for field in util.local_dataclass_fields(cls)
712 }
713
714 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
715
716 def local_attributes_for_class() -> (
717 Iterable[Tuple[str, Any, Any, bool]]
718 ):
719 for name in names:
720 field = dataclass_fields.get(name, None)
721 if field and sa_dataclass_metadata_key in field.metadata:
722 yield field.name, _as_dc_declaredattr(
723 field.metadata, fixed_sa_dataclass_metadata_key
724 ), cls_annotations.get(field.name), True
725 else:
726 yield name, cls_vars.get(name), cls_annotations.get(
727 name
728 ), False
729
730 return local_attributes_for_class
731
732 def _scan_attributes(self) -> None:
733 cls = self.cls
734
735 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
736
737 clsdict_view = self.clsdict_view
738 collected_attributes = self.collected_attributes
739 column_copies = self.column_copies
740 _include_dunders = self._include_dunders
741 mapper_args_fn = None
742 table_args = inherited_table_args = None
743 table_fn = None
744 tablename = None
745 fixed_table = "__table__" in clsdict_view
746
747 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
748
749 bases = []
750
751 for base in cls.__mro__:
752 # collect bases and make sure standalone columns are copied
753 # to be the column they will ultimately be on the class,
754 # so that declared_attr functions use the right columns.
755 # need to do this all the way up the hierarchy first
756 # (see #8190)
757
758 class_mapped = base is not cls and _is_supercls_for_inherits(base)
759
760 local_attributes_for_class = self._cls_attr_resolver(base)
761
762 if not class_mapped and base is not cls:
763 locally_collected_columns = self._produce_column_copies(
764 local_attributes_for_class,
765 attribute_is_overridden,
766 fixed_table,
767 base,
768 )
769 else:
770 locally_collected_columns = {}
771
772 bases.append(
773 (
774 base,
775 class_mapped,
776 local_attributes_for_class,
777 locally_collected_columns,
778 )
779 )
780
781 for (
782 base,
783 class_mapped,
784 local_attributes_for_class,
785 locally_collected_columns,
786 ) in bases:
787 # this transfer can also take place as we scan each name
788 # for finer-grained control of how collected_attributes is
789 # populated, as this is what impacts column ordering.
790 # however it's simpler to get it out of the way here.
791 collected_attributes.update(locally_collected_columns)
792
793 for (
794 name,
795 obj,
796 annotation,
797 is_dataclass_field,
798 ) in local_attributes_for_class():
799 if name in _include_dunders:
800 if name == "__mapper_args__":
801 check_decl = _check_declared_props_nocascade(
802 obj, name, cls
803 )
804 if not mapper_args_fn and (
805 not class_mapped or check_decl
806 ):
807 # don't even invoke __mapper_args__ until
808 # after we've determined everything about the
809 # mapped table.
810 # make a copy of it so a class-level dictionary
811 # is not overwritten when we update column-based
812 # arguments.
813 def _mapper_args_fn() -> Dict[str, Any]:
814 return dict(cls_as_Decl.__mapper_args__)
815
816 mapper_args_fn = _mapper_args_fn
817
818 elif name == "__tablename__":
819 check_decl = _check_declared_props_nocascade(
820 obj, name, cls
821 )
822 if not tablename and (not class_mapped or check_decl):
823 tablename = cls_as_Decl.__tablename__
824 elif name == "__table__":
825 check_decl = _check_declared_props_nocascade(
826 obj, name, cls
827 )
828 # if a @declared_attr using "__table__" is detected,
829 # wrap up a callable to look for "__table__" from
830 # the final concrete class when we set up a table.
831 # this was fixed by
832 # #11509, regression in 2.0 from version 1.4.
833 if check_decl and not table_fn:
834 # don't even invoke __table__ until we're ready
835 def _table_fn() -> FromClause:
836 return cls_as_Decl.__table__
837
838 table_fn = _table_fn
839
840 elif name == "__table_args__":
841 check_decl = _check_declared_props_nocascade(
842 obj, name, cls
843 )
844 if not table_args and (not class_mapped or check_decl):
845 table_args = cls_as_Decl.__table_args__
846 if not isinstance(
847 table_args, (tuple, dict, type(None))
848 ):
849 raise exc.ArgumentError(
850 "__table_args__ value must be a tuple, "
851 "dict, or None"
852 )
853 if base is not cls:
854 inherited_table_args = True
855 else:
856 # any other dunder names; should not be here
857 # as we have tested for all four names in
858 # _include_dunders
859 assert False
860 elif class_mapped:
861 if _is_declarative_props(obj) and not obj._quiet:
862 util.warn(
863 "Regular (i.e. not __special__) "
864 "attribute '%s.%s' uses @declared_attr, "
865 "but owning class %s is mapped - "
866 "not applying to subclass %s."
867 % (base.__name__, name, base, cls)
868 )
869
870 continue
871 elif base is not cls:
872 # we're a mixin, abstract base, or something that is
873 # acting like that for now.
874
875 if isinstance(obj, (Column, MappedColumn)):
876 # already copied columns to the mapped class.
877 continue
878 elif isinstance(obj, MapperProperty):
879 raise exc.InvalidRequestError(
880 "Mapper properties (i.e. deferred,"
881 "column_property(), relationship(), etc.) must "
882 "be declared as @declared_attr callables "
883 "on declarative mixin classes. For dataclass "
884 "field() objects, use a lambda:"
885 )
886 elif _is_declarative_props(obj):
887 # tried to get overloads to tell this to
888 # pylance, no luck
889 assert obj is not None
890
891 if obj._cascading:
892 if name in clsdict_view:
893 # unfortunately, while we can use the user-
894 # defined attribute here to allow a clean
895 # override, if there's another
896 # subclass below then it still tries to use
897 # this. not sure if there is enough
898 # information here to add this as a feature
899 # later on.
900 util.warn(
901 "Attribute '%s' on class %s cannot be "
902 "processed due to "
903 "@declared_attr.cascading; "
904 "skipping" % (name, cls)
905 )
906 collected_attributes[name] = column_copies[obj] = (
907 ret
908 ) = obj.__get__(obj, cls)
909 setattr(cls, name, ret)
910 else:
911 if is_dataclass_field:
912 # access attribute using normal class access
913 # first, to see if it's been mapped on a
914 # superclass. note if the dataclasses.field()
915 # has "default", this value can be anything.
916 ret = getattr(cls, name, None)
917
918 # so, if it's anything that's not ORM
919 # mapped, assume we should invoke the
920 # declared_attr
921 if not isinstance(ret, InspectionAttr):
922 ret = obj.fget()
923 else:
924 # access attribute using normal class access.
925 # if the declared attr already took place
926 # on a superclass that is mapped, then
927 # this is no longer a declared_attr, it will
928 # be the InstrumentedAttribute
929 ret = getattr(cls, name)
930
931 # correct for proxies created from hybrid_property
932 # or similar. note there is no known case that
933 # produces nested proxies, so we are only
934 # looking one level deep right now.
935
936 if (
937 isinstance(ret, InspectionAttr)
938 and attr_is_internal_proxy(ret)
939 and not isinstance(
940 ret.original_property, MapperProperty
941 )
942 ):
943 ret = ret.descriptor
944
945 collected_attributes[name] = column_copies[obj] = (
946 ret
947 )
948
949 if (
950 isinstance(ret, (Column, MapperProperty))
951 and ret.doc is None
952 ):
953 ret.doc = obj.__doc__
954
955 self._collect_annotation(
956 name,
957 obj._collect_return_annotation(),
958 base,
959 True,
960 obj,
961 )
962 elif _is_mapped_annotation(annotation, cls, base):
963 # Mapped annotation without any object.
964 # product_column_copies should have handled this.
965 # if future support for other MapperProperty,
966 # then test if this name is already handled and
967 # otherwise proceed to generate.
968 if not fixed_table:
969 assert (
970 name in collected_attributes
971 or attribute_is_overridden(name, None)
972 )
973 continue
974 else:
975 # here, the attribute is some other kind of
976 # property that we assume is not part of the
977 # declarative mapping. however, check for some
978 # more common mistakes
979 self._warn_for_decl_attributes(base, name, obj)
980 elif is_dataclass_field and (
981 name not in clsdict_view or clsdict_view[name] is not obj
982 ):
983 # here, we are definitely looking at the target class
984 # and not a superclass. this is currently a
985 # dataclass-only path. if the name is only
986 # a dataclass field and isn't in local cls.__dict__,
987 # put the object there.
988 # assert that the dataclass-enabled resolver agrees
989 # with what we are seeing
990
991 assert not attribute_is_overridden(name, obj)
992
993 if _is_declarative_props(obj):
994 obj = obj.fget()
995
996 collected_attributes[name] = obj
997 self._collect_annotation(
998 name, annotation, base, False, obj
999 )
1000 else:
1001 collected_annotation = self._collect_annotation(
1002 name, annotation, base, None, obj
1003 )
1004 is_mapped = (
1005 collected_annotation is not None
1006 and collected_annotation.mapped_container is not None
1007 )
1008 generated_obj = (
1009 collected_annotation.attr_value
1010 if collected_annotation is not None
1011 else obj
1012 )
1013 if obj is None and not fixed_table and is_mapped:
1014 collected_attributes[name] = (
1015 generated_obj
1016 if generated_obj is not None
1017 else MappedColumn()
1018 )
1019 elif name in clsdict_view:
1020 collected_attributes[name] = obj
1021 # else if the name is not in the cls.__dict__,
1022 # don't collect it as an attribute.
1023 # we will see the annotation only, which is meaningful
1024 # both for mapping and dataclasses setup
1025
1026 if inherited_table_args and not tablename:
1027 table_args = None
1028
1029 self.table_args = table_args
1030 self.tablename = tablename
1031 self.mapper_args_fn = mapper_args_fn
1032 self.table_fn = table_fn
1033
1034 def _setup_dataclasses_transforms(self) -> None:
1035 dataclass_setup_arguments = self.dataclass_setup_arguments
1036 if not dataclass_setup_arguments:
1037 return
1038
1039 # can't use is_dataclass since it uses hasattr
1040 if "__dataclass_fields__" in self.cls.__dict__:
1041 raise exc.InvalidRequestError(
1042 f"Class {self.cls} is already a dataclass; ensure that "
1043 "base classes / decorator styles of establishing dataclasses "
1044 "are not being mixed. "
1045 "This can happen if a class that inherits from "
1046 "'MappedAsDataclass', even indirectly, is been mapped with "
1047 "'@registry.mapped_as_dataclass'"
1048 )
1049
1050 # can't create a dataclass if __table__ is already there. This would
1051 # fail an assertion when calling _get_arguments_for_make_dataclass:
1052 # assert False, "Mapped[] received without a mapping declaration"
1053 if "__table__" in self.cls.__dict__:
1054 raise exc.InvalidRequestError(
1055 f"Class {self.cls} already defines a '__table__'. "
1056 "ORM Annotated Dataclasses do not support a pre-existing "
1057 "'__table__' element"
1058 )
1059
1060 warn_for_non_dc_attrs = collections.defaultdict(list)
1061
1062 def _allow_dataclass_field(
1063 key: str, originating_class: Type[Any]
1064 ) -> bool:
1065 if (
1066 originating_class is not self.cls
1067 and "__dataclass_fields__" not in originating_class.__dict__
1068 ):
1069 warn_for_non_dc_attrs[originating_class].append(key)
1070
1071 return True
1072
1073 manager = instrumentation.manager_of_class(self.cls)
1074 assert manager is not None
1075
1076 field_list = [
1077 _AttributeOptions._get_arguments_for_make_dataclass(
1078 self,
1079 key,
1080 anno,
1081 mapped_container,
1082 self.collected_attributes.get(key, _NoArg.NO_ARG),
1083 dataclass_setup_arguments,
1084 )
1085 for key, anno, mapped_container in (
1086 (
1087 key,
1088 mapped_anno if mapped_anno else raw_anno,
1089 mapped_container,
1090 )
1091 for key, (
1092 raw_anno,
1093 mapped_container,
1094 mapped_anno,
1095 is_dc,
1096 attr_value,
1097 originating_module,
1098 originating_class,
1099 ) in self.collected_annotations.items()
1100 if _allow_dataclass_field(key, originating_class)
1101 and (
1102 key not in self.collected_attributes
1103 # issue #9226; check for attributes that we've collected
1104 # which are already instrumented, which we would assume
1105 # mean we are in an ORM inheritance mapping and this
1106 # attribute is already mapped on the superclass. Under
1107 # no circumstance should any QueryableAttribute be sent to
1108 # the dataclass() function; anything that's mapped should
1109 # be Field and that's it
1110 or not isinstance(
1111 self.collected_attributes[key], QueryableAttribute
1112 )
1113 )
1114 )
1115 ]
1116 if warn_for_non_dc_attrs:
1117 for (
1118 originating_class,
1119 non_dc_attrs,
1120 ) in warn_for_non_dc_attrs.items():
1121 util.warn_deprecated(
1122 f"When transforming {self.cls} to a dataclass, "
1123 f"attribute(s) "
1124 f"{', '.join(repr(key) for key in non_dc_attrs)} "
1125 f"originates from superclass "
1126 f"{originating_class}, which is not a dataclass. This "
1127 f"usage is deprecated and will raise an error in "
1128 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
1129 f"Dataclasses, ensure that all mixin classes and other "
1130 f"superclasses which include attributes are also a "
1131 f"subclass of MappedAsDataclass.",
1132 "2.0",
1133 code="dcmx",
1134 )
1135
1136 annotations = {}
1137 defaults = {}
1138 for item in field_list:
1139 if len(item) == 2:
1140 name, tp = item
1141 elif len(item) == 3:
1142 name, tp, spec = item
1143 defaults[name] = spec
1144 else:
1145 assert False
1146 annotations[name] = tp
1147
1148 for k, v in defaults.items():
1149 setattr(self.cls, k, v)
1150
1151 self._apply_dataclasses_to_any_class(
1152 dataclass_setup_arguments, self.cls, annotations
1153 )
1154
1155 @classmethod
1156 def _update_annotations_for_non_mapped_class(
1157 cls, klass: Type[_O]
1158 ) -> Mapping[str, _AnnotationScanType]:
1159 cls_annotations = util.get_annotations(klass)
1160
1161 new_anno = {}
1162 for name, annotation in cls_annotations.items():
1163 if _is_mapped_annotation(annotation, klass, klass):
1164 extracted = _extract_mapped_subtype(
1165 annotation,
1166 klass,
1167 klass.__module__,
1168 name,
1169 type(None),
1170 required=False,
1171 is_dataclass_field=False,
1172 expect_mapped=False,
1173 )
1174 if extracted:
1175 inner, _ = extracted
1176 new_anno[name] = inner
1177 else:
1178 new_anno[name] = annotation
1179 return new_anno
1180
1181 @classmethod
1182 def _apply_dataclasses_to_any_class(
1183 cls,
1184 dataclass_setup_arguments: _DataclassArguments,
1185 klass: Type[_O],
1186 use_annotations: Mapping[str, _AnnotationScanType],
1187 ) -> None:
1188 cls._assert_dc_arguments(dataclass_setup_arguments)
1189
1190 dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
1191 if dataclass_callable is _NoArg.NO_ARG:
1192 dataclass_callable = dataclasses.dataclass
1193
1194 restored: Optional[Any]
1195
1196 if use_annotations:
1197 # apply constructed annotations that should look "normal" to a
1198 # dataclasses callable, based on the fields present. This
1199 # means remove the Mapped[] container and ensure all Field
1200 # entries have an annotation
1201 restored = getattr(klass, "__annotations__", None)
1202 klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
1203 else:
1204 restored = None
1205
1206 try:
1207 dataclass_callable(
1208 klass,
1209 **{
1210 k: v
1211 for k, v in dataclass_setup_arguments.items()
1212 if v is not _NoArg.NO_ARG
1213 and k not in ("dataclass_callable",)
1214 },
1215 )
1216 except (TypeError, ValueError) as ex:
1217 raise exc.InvalidRequestError(
1218 f"Python dataclasses error encountered when creating "
1219 f"dataclass for {klass.__name__!r}: "
1220 f"{ex!r}. Please refer to Python dataclasses "
1221 "documentation for additional information.",
1222 code="dcte",
1223 ) from ex
1224 finally:
1225 # restore original annotations outside of the dataclasses
1226 # process; for mixins and __abstract__ superclasses, SQLAlchemy
1227 # Declarative will need to see the Mapped[] container inside the
1228 # annotations in order to map subclasses
1229 if use_annotations:
1230 if restored is None:
1231 del klass.__annotations__
1232 else:
1233 klass.__annotations__ = restored
1234
1235 @classmethod
1236 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
1237 allowed = {
1238 "init",
1239 "repr",
1240 "order",
1241 "eq",
1242 "unsafe_hash",
1243 "kw_only",
1244 "match_args",
1245 "dataclass_callable",
1246 }
1247 disallowed_args = set(arguments).difference(allowed)
1248 if disallowed_args:
1249 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
1250 raise exc.ArgumentError(
1251 f"Dataclass argument(s) {msg} are not accepted"
1252 )
1253
1254 def _collect_annotation(
1255 self,
1256 name: str,
1257 raw_annotation: _AnnotationScanType,
1258 originating_class: Type[Any],
1259 expect_mapped: Optional[bool],
1260 attr_value: Any,
1261 ) -> Optional[_CollectedAnnotation]:
1262 if name in self.collected_annotations:
1263 return self.collected_annotations[name]
1264
1265 if raw_annotation is None:
1266 return None
1267
1268 is_dataclass = self.is_dataclass_prior_to_mapping
1269 allow_unmapped = self.allow_unmapped_annotations
1270
1271 if expect_mapped is None:
1272 is_dataclass_field = isinstance(attr_value, dataclasses.Field)
1273 expect_mapped = (
1274 not is_dataclass_field
1275 and not allow_unmapped
1276 and (
1277 attr_value is None
1278 or isinstance(attr_value, _MappedAttribute)
1279 )
1280 )
1281
1282 is_dataclass_field = False
1283 extracted = _extract_mapped_subtype(
1284 raw_annotation,
1285 self.cls,
1286 originating_class.__module__,
1287 name,
1288 type(attr_value),
1289 required=False,
1290 is_dataclass_field=is_dataclass_field,
1291 expect_mapped=expect_mapped and not is_dataclass,
1292 )
1293 if extracted is None:
1294 # ClassVar can come out here
1295 return None
1296
1297 extracted_mapped_annotation, mapped_container = extracted
1298
1299 if attr_value is None and not is_literal(extracted_mapped_annotation):
1300 for elem in get_args(extracted_mapped_annotation):
1301 if is_fwd_ref(
1302 elem, check_generic=True, check_for_plain_string=True
1303 ):
1304 elem = de_stringify_annotation(
1305 self.cls,
1306 elem,
1307 originating_class.__module__,
1308 include_generic=True,
1309 )
1310 # look in Annotated[...] for an ORM construct,
1311 # such as Annotated[int, mapped_column(primary_key=True)]
1312 if isinstance(elem, _IntrospectsAnnotations):
1313 attr_value = elem.found_in_pep593_annotated()
1314
1315 self.collected_annotations[name] = ca = _CollectedAnnotation(
1316 raw_annotation,
1317 mapped_container,
1318 extracted_mapped_annotation,
1319 is_dataclass,
1320 attr_value,
1321 originating_class.__module__,
1322 originating_class,
1323 )
1324 return ca
1325
1326 def _warn_for_decl_attributes(
1327 self, cls: Type[Any], key: str, c: Any
1328 ) -> None:
1329 if isinstance(c, expression.ColumnElement):
1330 util.warn(
1331 f"Attribute '{key}' on class {cls} appears to "
1332 "be a non-schema SQLAlchemy expression "
1333 "object; this won't be part of the declarative mapping. "
1334 "To map arbitrary expressions, use ``column_property()`` "
1335 "or a similar function such as ``deferred()``, "
1336 "``query_expression()`` etc. "
1337 )
1338
1339 def _produce_column_copies(
1340 self,
1341 attributes_for_class: Callable[
1342 [], Iterable[Tuple[str, Any, Any, bool]]
1343 ],
1344 attribute_is_overridden: Callable[[str, Any], bool],
1345 fixed_table: bool,
1346 originating_class: Type[Any],
1347 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
1348 cls = self.cls
1349 dict_ = self.clsdict_view
1350 locally_collected_attributes = {}
1351 column_copies = self.column_copies
1352 # copy mixin columns to the mapped class
1353
1354 for name, obj, annotation, is_dataclass in attributes_for_class():
1355 if (
1356 not fixed_table
1357 and obj is None
1358 and _is_mapped_annotation(annotation, cls, originating_class)
1359 ):
1360 # obj is None means this is the annotation only path
1361
1362 if attribute_is_overridden(name, obj):
1363 # perform same "overridden" check as we do for
1364 # Column/MappedColumn, this is how a mixin col is not
1365 # applied to an inherited subclass that does not have
1366 # the mixin. the anno-only path added here for
1367 # #9564
1368 continue
1369
1370 collected_annotation = self._collect_annotation(
1371 name, annotation, originating_class, True, obj
1372 )
1373 obj = (
1374 collected_annotation.attr_value
1375 if collected_annotation is not None
1376 else obj
1377 )
1378 if obj is None:
1379 obj = MappedColumn()
1380
1381 locally_collected_attributes[name] = obj
1382 setattr(cls, name, obj)
1383
1384 elif isinstance(obj, (Column, MappedColumn)):
1385 if attribute_is_overridden(name, obj):
1386 # if column has been overridden
1387 # (like by the InstrumentedAttribute of the
1388 # superclass), skip. don't collect the annotation
1389 # either (issue #8718)
1390 continue
1391
1392 collected_annotation = self._collect_annotation(
1393 name, annotation, originating_class, True, obj
1394 )
1395 obj = (
1396 collected_annotation.attr_value
1397 if collected_annotation is not None
1398 else obj
1399 )
1400
1401 if name not in dict_ and not (
1402 "__table__" in dict_
1403 and (getattr(obj, "name", None) or name)
1404 in dict_["__table__"].c
1405 ):
1406 if obj.foreign_keys:
1407 for fk in obj.foreign_keys:
1408 if (
1409 fk._table_column is not None
1410 and fk._table_column.table is None
1411 ):
1412 raise exc.InvalidRequestError(
1413 "Columns with foreign keys to "
1414 "non-table-bound "
1415 "columns must be declared as "
1416 "@declared_attr callables "
1417 "on declarative mixin classes. "
1418 "For dataclass "
1419 "field() objects, use a lambda:."
1420 )
1421
1422 column_copies[obj] = copy_ = obj._copy()
1423
1424 locally_collected_attributes[name] = copy_
1425 setattr(cls, name, copy_)
1426
1427 return locally_collected_attributes
1428
1429 def _extract_mappable_attributes(self) -> None:
1430 cls = self.cls
1431 collected_attributes = self.collected_attributes
1432
1433 our_stuff = self.properties
1434
1435 _include_dunders = self._include_dunders
1436
1437 late_mapped = _get_immediate_cls_attr(
1438 cls, "_sa_decl_prepare_nocascade", strict=True
1439 )
1440
1441 allow_unmapped_annotations = self.allow_unmapped_annotations
1442 expect_annotations_wo_mapped = (
1443 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
1444 )
1445
1446 look_for_dataclass_things = bool(self.dataclass_setup_arguments)
1447
1448 for k in list(collected_attributes):
1449 if k in _include_dunders:
1450 continue
1451
1452 value = collected_attributes[k]
1453
1454 if _is_declarative_props(value):
1455 # @declared_attr in collected_attributes only occurs here for a
1456 # @declared_attr that's directly on the mapped class;
1457 # for a mixin, these have already been evaluated
1458 if value._cascading:
1459 util.warn(
1460 "Use of @declared_attr.cascading only applies to "
1461 "Declarative 'mixin' and 'abstract' classes. "
1462 "Currently, this flag is ignored on mapped class "
1463 "%s" % self.cls
1464 )
1465
1466 value = getattr(cls, k)
1467
1468 elif (
1469 isinstance(value, QueryableAttribute)
1470 and value.class_ is not cls
1471 and value.key != k
1472 ):
1473 # detect a QueryableAttribute that's already mapped being
1474 # assigned elsewhere in userland, turn into a synonym()
1475 value = SynonymProperty(value.key)
1476 setattr(cls, k, value)
1477
1478 if (
1479 isinstance(value, tuple)
1480 and len(value) == 1
1481 and isinstance(value[0], (Column, _MappedAttribute))
1482 ):
1483 util.warn(
1484 "Ignoring declarative-like tuple value of attribute "
1485 "'%s': possibly a copy-and-paste error with a comma "
1486 "accidentally placed at the end of the line?" % k
1487 )
1488 continue
1489 elif look_for_dataclass_things and isinstance(
1490 value, dataclasses.Field
1491 ):
1492 # we collected a dataclass Field; dataclasses would have
1493 # set up the correct state on the class
1494 continue
1495 elif not isinstance(value, (Column, _DCAttributeOptions)):
1496 # using @declared_attr for some object that
1497 # isn't Column/MapperProperty/_DCAttributeOptions; remove
1498 # from the clsdict_view
1499 # and place the evaluated value onto the class.
1500 collected_attributes.pop(k)
1501 self._warn_for_decl_attributes(cls, k, value)
1502 if not late_mapped:
1503 setattr(cls, k, value)
1504 continue
1505 # we expect to see the name 'metadata' in some valid cases;
1506 # however at this point we see it's assigned to something trying
1507 # to be mapped, so raise for that.
1508 # TODO: should "registry" here be also? might be too late
1509 # to change that now (2.0 betas)
1510 elif k in ("metadata",):
1511 raise exc.InvalidRequestError(
1512 f"Attribute name '{k}' is reserved when using the "
1513 "Declarative API."
1514 )
1515 elif isinstance(value, Column):
1516 _undefer_column_name(
1517 k, self.column_copies.get(value, value) # type: ignore
1518 )
1519 else:
1520 if isinstance(value, _IntrospectsAnnotations):
1521 (
1522 annotation,
1523 mapped_container,
1524 extracted_mapped_annotation,
1525 is_dataclass,
1526 attr_value,
1527 originating_module,
1528 originating_class,
1529 ) = self.collected_annotations.get(
1530 k, (None, None, None, False, None, None, None)
1531 )
1532
1533 # issue #8692 - don't do any annotation interpretation if
1534 # an annotation were present and a container such as
1535 # Mapped[] etc. were not used. If annotation is None,
1536 # do declarative_scan so that the property can raise
1537 # for required
1538 if (
1539 mapped_container is not None
1540 or annotation is None
1541 # issue #10516: need to do declarative_scan even with
1542 # a non-Mapped annotation if we are doing
1543 # __allow_unmapped__, for things like col.name
1544 # assignment
1545 or allow_unmapped_annotations
1546 ):
1547 try:
1548 value.declarative_scan(
1549 self,
1550 self.registry,
1551 cls,
1552 originating_module,
1553 k,
1554 mapped_container,
1555 annotation,
1556 extracted_mapped_annotation,
1557 is_dataclass,
1558 )
1559 except NameError as ne:
1560 raise orm_exc.MappedAnnotationError(
1561 f"Could not resolve all types within mapped "
1562 f'annotation: "{annotation}". Ensure all '
1563 f"types are written correctly and are "
1564 f"imported within the module in use."
1565 ) from ne
1566 else:
1567 # assert that we were expecting annotations
1568 # without Mapped[] were going to be passed.
1569 # otherwise an error should have been raised
1570 # by util._extract_mapped_subtype before we got here.
1571 assert expect_annotations_wo_mapped
1572
1573 if isinstance(value, _DCAttributeOptions):
1574 if (
1575 value._has_dataclass_arguments
1576 and not look_for_dataclass_things
1577 ):
1578 if isinstance(value, MapperProperty):
1579 argnames = [
1580 "init",
1581 "default_factory",
1582 "repr",
1583 "default",
1584 ]
1585 else:
1586 argnames = ["init", "default_factory", "repr"]
1587
1588 args = {
1589 a
1590 for a in argnames
1591 if getattr(
1592 value._attribute_options, f"dataclasses_{a}"
1593 )
1594 is not _NoArg.NO_ARG
1595 }
1596
1597 raise exc.ArgumentError(
1598 f"Attribute '{k}' on class {cls} includes "
1599 f"dataclasses argument(s): "
1600 f"{', '.join(sorted(repr(a) for a in args))} but "
1601 f"class does not specify "
1602 "SQLAlchemy native dataclass configuration."
1603 )
1604
1605 if not isinstance(value, (MapperProperty, _MapsColumns)):
1606 # filter for _DCAttributeOptions objects that aren't
1607 # MapperProperty / mapped_column(). Currently this
1608 # includes AssociationProxy. pop it from the things
1609 # we're going to map and set it up as a descriptor
1610 # on the class.
1611 collected_attributes.pop(k)
1612
1613 # Assoc Prox (or other descriptor object that may
1614 # use _DCAttributeOptions) is usually here, except if
1615 # 1. we're a
1616 # dataclass, dataclasses would have removed the
1617 # attr here or 2. assoc proxy is coming from a
1618 # superclass, we want it to be direct here so it
1619 # tracks state or 3. assoc prox comes from
1620 # declared_attr, uncommon case
1621 setattr(cls, k, value)
1622 continue
1623
1624 our_stuff[k] = value
1625
1626 def _extract_declared_columns(self) -> None:
1627 our_stuff = self.properties
1628
1629 # extract columns from the class dict
1630 declared_columns = self.declared_columns
1631 column_ordering = self.column_ordering
1632 name_to_prop_key = collections.defaultdict(set)
1633
1634 for key, c in list(our_stuff.items()):
1635 if isinstance(c, _MapsColumns):
1636 mp_to_assign = c.mapper_property_to_assign
1637 if mp_to_assign:
1638 our_stuff[key] = mp_to_assign
1639 else:
1640 # if no mapper property to assign, this currently means
1641 # this is a MappedColumn that will produce a Column for us
1642 del our_stuff[key]
1643
1644 for col, sort_order in c.columns_to_assign:
1645 if not isinstance(c, CompositeProperty):
1646 name_to_prop_key[col.name].add(key)
1647 declared_columns.add(col)
1648
1649 # we would assert this, however we want the below
1650 # warning to take effect instead. See #9630
1651 # assert col not in column_ordering
1652
1653 column_ordering[col] = sort_order
1654
1655 # if this is a MappedColumn and the attribute key we
1656 # have is not what the column has for its key, map the
1657 # Column explicitly under the attribute key name.
1658 # otherwise, Mapper will map it under the column key.
1659 if mp_to_assign is None and key != col.key:
1660 our_stuff[key] = col
1661 elif isinstance(c, Column):
1662 # undefer previously occurred here, and now occurs earlier.
1663 # ensure every column we get here has been named
1664 assert c.name is not None
1665 name_to_prop_key[c.name].add(key)
1666 declared_columns.add(c)
1667 # if the column is the same name as the key,
1668 # remove it from the explicit properties dict.
1669 # the normal rules for assigning column-based properties
1670 # will take over, including precedence of columns
1671 # in multi-column ColumnProperties.
1672 if key == c.key:
1673 del our_stuff[key]
1674
1675 for name, keys in name_to_prop_key.items():
1676 if len(keys) > 1:
1677 util.warn(
1678 "On class %r, Column object %r named "
1679 "directly multiple times, "
1680 "only one will be used: %s. "
1681 "Consider using orm.synonym instead"
1682 % (self.classname, name, (", ".join(sorted(keys))))
1683 )
1684
1685 def _setup_table(self, table: Optional[FromClause] = None) -> None:
1686 cls = self.cls
1687 cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
1688
1689 tablename = self.tablename
1690 table_args = self.table_args
1691 clsdict_view = self.clsdict_view
1692 declared_columns = self.declared_columns
1693 column_ordering = self.column_ordering
1694
1695 manager = attributes.manager_of_class(cls)
1696
1697 if (
1698 self.table_fn is None
1699 and "__table__" not in clsdict_view
1700 and table is None
1701 ):
1702 if hasattr(cls, "__table_cls__"):
1703 table_cls = cast(
1704 Type[Table],
1705 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
1706 )
1707 else:
1708 table_cls = Table
1709
1710 if tablename is not None:
1711 args: Tuple[Any, ...] = ()
1712 table_kw: Dict[str, Any] = {}
1713
1714 if table_args:
1715 if isinstance(table_args, dict):
1716 table_kw = table_args
1717 elif isinstance(table_args, tuple):
1718 if isinstance(table_args[-1], dict):
1719 args, table_kw = table_args[0:-1], table_args[-1]
1720 else:
1721 args = table_args
1722
1723 autoload_with = clsdict_view.get("__autoload_with__")
1724 if autoload_with:
1725 table_kw["autoload_with"] = autoload_with
1726
1727 autoload = clsdict_view.get("__autoload__")
1728 if autoload:
1729 table_kw["autoload"] = True
1730
1731 sorted_columns = sorted(
1732 declared_columns,
1733 key=lambda c: column_ordering.get(c, 0),
1734 )
1735 table = self.set_cls_attribute(
1736 "__table__",
1737 table_cls(
1738 tablename,
1739 self._metadata_for_cls(manager),
1740 *sorted_columns,
1741 *args,
1742 **table_kw,
1743 ),
1744 )
1745 else:
1746 if table is None:
1747 if self.table_fn:
1748 table = self.set_cls_attribute(
1749 "__table__", self.table_fn()
1750 )
1751 else:
1752 table = cls_as_Decl.__table__
1753 if declared_columns:
1754 for c in declared_columns:
1755 if not table.c.contains_column(c):
1756 raise exc.ArgumentError(
1757 "Can't add additional column %r when "
1758 "specifying __table__" % c.key
1759 )
1760
1761 self.local_table = table
1762
1763 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
1764 meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
1765 if meta is not None:
1766 return meta
1767 else:
1768 return manager.registry.metadata
1769
1770 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
1771 cls = self.cls
1772
1773 inherits = mapper_kw.get("inherits", None)
1774
1775 if inherits is None:
1776 # since we search for classical mappings now, search for
1777 # multiple mapped bases as well and raise an error.
1778 inherits_search = []
1779 for base_ in cls.__bases__:
1780 c = _resolve_for_abstract_or_classical(base_)
1781 if c is None:
1782 continue
1783
1784 if _is_supercls_for_inherits(c) and c not in inherits_search:
1785 inherits_search.append(c)
1786
1787 if inherits_search:
1788 if len(inherits_search) > 1:
1789 raise exc.InvalidRequestError(
1790 "Class %s has multiple mapped bases: %r"
1791 % (cls, inherits_search)
1792 )
1793 inherits = inherits_search[0]
1794 elif isinstance(inherits, Mapper):
1795 inherits = inherits.class_
1796
1797 self.inherits = inherits
1798
1799 clsdict_view = self.clsdict_view
1800 if "__table__" not in clsdict_view and self.tablename is None:
1801 self.single = True
1802
1803 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
1804 table = self.local_table
1805 cls = self.cls
1806 table_args = self.table_args
1807 declared_columns = self.declared_columns
1808
1809 if (
1810 table is None
1811 and self.inherits is None
1812 and not _get_immediate_cls_attr(cls, "__no_table__")
1813 ):
1814 raise exc.InvalidRequestError(
1815 "Class %r does not have a __table__ or __tablename__ "
1816 "specified and does not inherit from an existing "
1817 "table-mapped class." % cls
1818 )
1819 elif self.inherits:
1820 inherited_mapper_or_config = _declared_mapping_info(self.inherits)
1821 assert inherited_mapper_or_config is not None
1822 inherited_table = inherited_mapper_or_config.local_table
1823 inherited_persist_selectable = (
1824 inherited_mapper_or_config.persist_selectable
1825 )
1826
1827 if table is None:
1828 # single table inheritance.
1829 # ensure no table args
1830 if table_args:
1831 raise exc.ArgumentError(
1832 "Can't place __table_args__ on an inherited class "
1833 "with no table."
1834 )
1835
1836 # add any columns declared here to the inherited table.
1837 if declared_columns and not isinstance(inherited_table, Table):
1838 raise exc.ArgumentError(
1839 f"Can't declare columns on single-table-inherited "
1840 f"subclass {self.cls}; superclass {self.inherits} "
1841 "is not mapped to a Table"
1842 )
1843
1844 for col in declared_columns:
1845 assert inherited_table is not None
1846 if col.name in inherited_table.c:
1847 if inherited_table.c[col.name] is col:
1848 continue
1849 raise exc.ArgumentError(
1850 f"Column '{col}' on class {cls.__name__} "
1851 f"conflicts with existing column "
1852 f"'{inherited_table.c[col.name]}'. If using "
1853 f"Declarative, consider using the "
1854 "use_existing_column parameter of mapped_column() "
1855 "to resolve conflicts."
1856 )
1857 if col.primary_key:
1858 raise exc.ArgumentError(
1859 "Can't place primary key columns on an inherited "
1860 "class with no table."
1861 )
1862
1863 if TYPE_CHECKING:
1864 assert isinstance(inherited_table, Table)
1865
1866 inherited_table.append_column(col)
1867 if (
1868 inherited_persist_selectable is not None
1869 and inherited_persist_selectable is not inherited_table
1870 ):
1871 inherited_persist_selectable._refresh_for_new_column(
1872 col
1873 )
1874
1875 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
1876 properties = self.properties
1877
1878 if self.mapper_args_fn:
1879 mapper_args = self.mapper_args_fn()
1880 else:
1881 mapper_args = {}
1882
1883 if mapper_kw:
1884 mapper_args.update(mapper_kw)
1885
1886 if "properties" in mapper_args:
1887 properties = dict(properties)
1888 properties.update(mapper_args["properties"])
1889
1890 # make sure that column copies are used rather
1891 # than the original columns from any mixins
1892 for k in ("version_id_col", "polymorphic_on"):
1893 if k in mapper_args:
1894 v = mapper_args[k]
1895 mapper_args[k] = self.column_copies.get(v, v)
1896
1897 if "primary_key" in mapper_args:
1898 mapper_args["primary_key"] = [
1899 self.column_copies.get(v, v)
1900 for v in util.to_list(mapper_args["primary_key"])
1901 ]
1902
1903 if "inherits" in mapper_args:
1904 inherits_arg = mapper_args["inherits"]
1905 if isinstance(inherits_arg, Mapper):
1906 inherits_arg = inherits_arg.class_
1907
1908 if inherits_arg is not self.inherits:
1909 raise exc.InvalidRequestError(
1910 "mapper inherits argument given for non-inheriting "
1911 "class %s" % (mapper_args["inherits"])
1912 )
1913
1914 if self.inherits:
1915 mapper_args["inherits"] = self.inherits
1916
1917 if self.inherits and not mapper_args.get("concrete", False):
1918 # note the superclass is expected to have a Mapper assigned and
1919 # not be a deferred config, as this is called within map()
1920 inherited_mapper = class_mapper(self.inherits, False)
1921 inherited_table = inherited_mapper.local_table
1922
1923 # single or joined inheritance
1924 # exclude any cols on the inherited table which are
1925 # not mapped on the parent class, to avoid
1926 # mapping columns specific to sibling/nephew classes
1927 if "exclude_properties" not in mapper_args:
1928 mapper_args["exclude_properties"] = exclude_properties = {
1929 c.key
1930 for c in inherited_table.c
1931 if c not in inherited_mapper._columntoproperty
1932 }.union(inherited_mapper.exclude_properties or ())
1933 exclude_properties.difference_update(
1934 [c.key for c in self.declared_columns]
1935 )
1936
1937 # look through columns in the current mapper that
1938 # are keyed to a propname different than the colname
1939 # (if names were the same, we'd have popped it out above,
1940 # in which case the mapper makes this combination).
1941 # See if the superclass has a similar column property.
1942 # If so, join them together.
1943 for k, col in list(properties.items()):
1944 if not isinstance(col, expression.ColumnElement):
1945 continue
1946 if k in inherited_mapper._props:
1947 p = inherited_mapper._props[k]
1948 if isinstance(p, ColumnProperty):
1949 # note here we place the subclass column
1950 # first. See [ticket:1892] for background.
1951 properties[k] = [col] + p.columns
1952 result_mapper_args = mapper_args.copy()
1953 result_mapper_args["properties"] = properties
1954 self.mapper_args = result_mapper_args
1955
1956 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
1957 self._prepare_mapper_arguments(mapper_kw)
1958 if hasattr(self.cls, "__mapper_cls__"):
1959 mapper_cls = cast(
1960 "Type[Mapper[Any]]",
1961 util.unbound_method_to_callable(
1962 self.cls.__mapper_cls__ # type: ignore
1963 ),
1964 )
1965 else:
1966 mapper_cls = Mapper
1967
1968 return self.set_cls_attribute(
1969 "__mapper__",
1970 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1971 )
1972
1973
1974@util.preload_module("sqlalchemy.orm.decl_api")
1975def _as_dc_declaredattr(
1976 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
1977) -> Any:
1978 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1979 # we can't write it because field.metadata is immutable :( so we have
1980 # to go through extra trouble to compare these
1981 decl_api = util.preloaded.orm_decl_api
1982 obj = field_metadata[sa_dataclass_metadata_key]
1983 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
1984 return decl_api.declared_attr(obj)
1985 else:
1986 return obj
1987
1988
1989class _DeferredMapperConfig(_ClassScanMapperConfig):
1990 _cls: weakref.ref[Type[Any]]
1991
1992 is_deferred = True
1993
1994 _configs: util.OrderedDict[
1995 weakref.ref[Type[Any]], _DeferredMapperConfig
1996 ] = util.OrderedDict()
1997
1998 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
1999 pass
2000
2001 @property
2002 def cls(self) -> Type[Any]:
2003 return self._cls() # type: ignore
2004
2005 @cls.setter
2006 def cls(self, class_: Type[Any]) -> None:
2007 self._cls = weakref.ref(class_, self._remove_config_cls)
2008 self._configs[self._cls] = self
2009
2010 @classmethod
2011 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
2012 cls._configs.pop(ref, None)
2013
2014 @classmethod
2015 def has_cls(cls, class_: Type[Any]) -> bool:
2016 # 2.6 fails on weakref if class_ is an old style class
2017 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
2018
2019 @classmethod
2020 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
2021 if hasattr(class_, "_sa_raise_deferred_config"):
2022 class_._sa_raise_deferred_config()
2023
2024 raise orm_exc.UnmappedClassError(
2025 class_,
2026 msg=(
2027 f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
2028 "mapping on it. It is not yet usable as a mapped class."
2029 ),
2030 )
2031
2032 @classmethod
2033 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
2034 return cls._configs[weakref.ref(class_)]
2035
2036 @classmethod
2037 def classes_for_base(
2038 cls, base_cls: Type[Any], sort: bool = True
2039 ) -> List[_DeferredMapperConfig]:
2040 classes_for_base = [
2041 m
2042 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
2043 if cls_ is not None and issubclass(cls_, base_cls)
2044 ]
2045
2046 if not sort:
2047 return classes_for_base
2048
2049 all_m_by_cls = {m.cls: m for m in classes_for_base}
2050
2051 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
2052 for m_cls in all_m_by_cls:
2053 tuples.extend(
2054 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
2055 for base_cls in m_cls.__bases__
2056 if base_cls in all_m_by_cls
2057 )
2058 return list(topological.sort(tuples, classes_for_base))
2059
2060 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
2061 self._configs.pop(self._cls, None)
2062 return super().map(mapper_kw)
2063
2064
2065def _add_attribute(
2066 cls: Type[Any], key: str, value: MapperProperty[Any]
2067) -> None:
2068 """add an attribute to an existing declarative class.
2069
2070 This runs through the logic to determine MapperProperty,
2071 adds it to the Mapper, adds a column to the mapped Table, etc.
2072
2073 """
2074
2075 if "__mapper__" in cls.__dict__:
2076 mapped_cls = cast("MappedClassProtocol[Any]", cls)
2077
2078 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
2079 if isinstance(mc.__table__, Table):
2080 return mc.__table__
2081 raise exc.InvalidRequestError(
2082 f"Cannot add a new attribute to mapped class {mc.__name__!r} "
2083 "because it's not mapped against a table."
2084 )
2085
2086 if isinstance(value, Column):
2087 _undefer_column_name(key, value)
2088 _table_or_raise(mapped_cls).append_column(
2089 value, replace_existing=True
2090 )
2091 mapped_cls.__mapper__.add_property(key, value)
2092 elif isinstance(value, _MapsColumns):
2093 mp = value.mapper_property_to_assign
2094 for col, _ in value.columns_to_assign:
2095 _undefer_column_name(key, col)
2096 _table_or_raise(mapped_cls).append_column(
2097 col, replace_existing=True
2098 )
2099 if not mp:
2100 mapped_cls.__mapper__.add_property(key, col)
2101 if mp:
2102 mapped_cls.__mapper__.add_property(key, mp)
2103 elif isinstance(value, MapperProperty):
2104 mapped_cls.__mapper__.add_property(key, value)
2105 elif isinstance(value, QueryableAttribute) and value.key != key:
2106 # detect a QueryableAttribute that's already mapped being
2107 # assigned elsewhere in userland, turn into a synonym()
2108 value = SynonymProperty(value.key)
2109 mapped_cls.__mapper__.add_property(key, value)
2110 else:
2111 type.__setattr__(cls, key, value)
2112 mapped_cls.__mapper__._expire_memoizations()
2113 else:
2114 type.__setattr__(cls, key, value)
2115
2116
2117def _del_attribute(cls: Type[Any], key: str) -> None:
2118 if (
2119 "__mapper__" in cls.__dict__
2120 and key in cls.__dict__
2121 and not cast(
2122 "MappedClassProtocol[Any]", cls
2123 ).__mapper__._dispose_called
2124 ):
2125 value = cls.__dict__[key]
2126 if isinstance(
2127 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
2128 ):
2129 raise NotImplementedError(
2130 "Can't un-map individual mapped attributes on a mapped class."
2131 )
2132 else:
2133 type.__delattr__(cls, key)
2134 cast(
2135 "MappedClassProtocol[Any]", cls
2136 ).__mapper__._expire_memoizations()
2137 else:
2138 type.__delattr__(cls, key)
2139
2140
2141def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2142 """A simple constructor that allows initialization from kwargs.
2143
2144 Sets attributes on the constructed instance using the names and
2145 values in ``kwargs``.
2146
2147 Only keys that are present as
2148 attributes of the instance's class are allowed. These could be,
2149 for example, any mapped columns or relationships.
2150 """
2151 cls_ = type(self)
2152 for k in kwargs:
2153 if not hasattr(cls_, k):
2154 raise TypeError(
2155 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2156 )
2157 setattr(self, k, kwargs[k])
2158
2159
2160_declarative_constructor.__name__ = "__init__"
2161
2162
2163def _undefer_column_name(key: str, column: Column[Any]) -> None:
2164 if column.key is None:
2165 column.key = key
2166 if column.name is None:
2167 column.name = key