1# orm/decl_base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Internal implementation for declarative."""
9
10from __future__ import annotations
11
12import collections
13import dataclasses
14import re
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Dict
19from typing import Iterable
20from typing import List
21from typing import Mapping
22from typing import NamedTuple
23from typing import NoReturn
24from typing import Optional
25from typing import Protocol
26from typing import Sequence
27from typing import Tuple
28from typing import Type
29from typing import TYPE_CHECKING
30from typing import TypedDict
31from typing import TypeVar
32from typing import Union
33import weakref
34
35from . import attributes
36from . import clsregistry
37from . import exc as orm_exc
38from . import instrumentation
39from . import mapperlib
40from ._typing import _O
41from ._typing import attr_is_internal_proxy
42from .attributes import InstrumentedAttribute
43from .attributes import QueryableAttribute
44from .base import _is_mapped_class
45from .base import InspectionAttr
46from .descriptor_props import CompositeProperty
47from .descriptor_props import SynonymProperty
48from .interfaces import _AttributeOptions
49from .interfaces import _DCAttributeOptions
50from .interfaces import _IntrospectsAnnotations
51from .interfaces import _MappedAttribute
52from .interfaces import _MapsColumns
53from .interfaces import MapperProperty
54from .mapper import Mapper
55from .properties import ColumnProperty
56from .properties import MappedColumn
57from .util import _extract_mapped_subtype
58from .util import _is_mapped_annotation
59from .util import class_mapper
60from .util import de_stringify_annotation
61from .. import event
62from .. import exc
63from .. import util
64from ..sql import expression
65from ..sql.base import _NoArg
66from ..sql.schema import Column
67from ..sql.schema import Table
68from ..util import topological
69from ..util.typing import _AnnotationScanType
70from ..util.typing import is_fwd_ref
71from ..util.typing import is_literal
72from ..util.typing import typing_get_args
73
74if TYPE_CHECKING:
75 from ._typing import _ClassDict
76 from ._typing import _RegistryType
77 from .base import Mapped
78 from .decl_api import declared_attr
79 from .instrumentation import ClassManager
80 from ..sql.elements import NamedColumn
81 from ..sql.schema import MetaData
82 from ..sql.selectable import FromClause
83
84_T = TypeVar("_T", bound=Any)
85
86_MapperKwArgs = Mapping[str, Any]
87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
88
89
90class MappedClassProtocol(Protocol[_O]):
91 """A protocol representing a SQLAlchemy mapped class.
92
93 The protocol is generic on the type of class, use
94 ``MappedClassProtocol[Any]`` to allow any mapped class.
95 """
96
97 __name__: str
98 __mapper__: Mapper[_O]
99 __table__: FromClause
100
101 def __call__(self, **kw: Any) -> _O: ...
102
103
104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
105 "Internal more detailed version of ``MappedClassProtocol``."
106 metadata: MetaData
107 __tablename__: str
108 __mapper_args__: _MapperKwArgs
109 __table_args__: Optional[_TableArgsType]
110
111 _sa_apply_dc_transforms: Optional[_DataclassArguments]
112
113 def __declare_first__(self) -> None: ...
114
115 def __declare_last__(self) -> None: ...
116
117
118class _DataclassArguments(TypedDict):
119 init: Union[_NoArg, bool]
120 repr: Union[_NoArg, bool]
121 eq: Union[_NoArg, bool]
122 order: Union[_NoArg, bool]
123 unsafe_hash: Union[_NoArg, bool]
124 match_args: Union[_NoArg, bool]
125 kw_only: Union[_NoArg, bool]
126 dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]]
127
128
129def _declared_mapping_info(
130 cls: Type[Any],
131) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
132 # deferred mapping
133 if _DeferredMapperConfig.has_cls(cls):
134 return _DeferredMapperConfig.config_for_cls(cls)
135 # regular mapping
136 elif _is_mapped_class(cls):
137 return class_mapper(cls, configure=False)
138 else:
139 return None
140
141
142def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
143 """return True if this class will be used as a superclass to set in
144 'inherits'.
145
146 This includes deferred mapper configs that aren't mapped yet, however does
147 not include classes with _sa_decl_prepare_nocascade (e.g.
148 ``AbstractConcreteBase``); these concrete-only classes are not set up as
149 "inherits" until after mappers are configured using
150 mapper._set_concrete_base()
151
152 """
153 if _DeferredMapperConfig.has_cls(cls):
154 return not _get_immediate_cls_attr(
155 cls, "_sa_decl_prepare_nocascade", strict=True
156 )
157 # regular mapping
158 elif _is_mapped_class(cls):
159 return True
160 else:
161 return False
162
163
164def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
165 if cls is object:
166 return None
167
168 sup: Optional[Type[Any]]
169
170 if cls.__dict__.get("__abstract__", False):
171 for base_ in cls.__bases__:
172 sup = _resolve_for_abstract_or_classical(base_)
173 if sup is not None:
174 return sup
175 else:
176 return None
177 else:
178 clsmanager = _dive_for_cls_manager(cls)
179
180 if clsmanager:
181 return clsmanager.class_
182 else:
183 return cls
184
185
186def _get_immediate_cls_attr(
187 cls: Type[Any], attrname: str, strict: bool = False
188) -> Optional[Any]:
189 """return an attribute of the class that is either present directly
190 on the class, e.g. not on a superclass, or is from a superclass but
191 this superclass is a non-mapped mixin, that is, not a descendant of
192 the declarative base and is also not classically mapped.
193
194 This is used to detect attributes that indicate something about
195 a mapped class independently from any mapped classes that it may
196 inherit from.
197
198 """
199
200 # the rules are different for this name than others,
201 # make sure we've moved it out. transitional
202 assert attrname != "__abstract__"
203
204 if not issubclass(cls, object):
205 return None
206
207 if attrname in cls.__dict__:
208 return getattr(cls, attrname)
209
210 for base in cls.__mro__[1:]:
211 _is_classical_inherits = _dive_for_cls_manager(base) is not None
212
213 if attrname in base.__dict__ and (
214 base is cls
215 or (
216 (base in cls.__bases__ if strict else True)
217 and not _is_classical_inherits
218 )
219 ):
220 return getattr(base, attrname)
221 else:
222 return None
223
224
225def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
226 # because the class manager registration is pluggable,
227 # we need to do the search for every class in the hierarchy,
228 # rather than just a simple "cls._sa_class_manager"
229
230 for base in cls.__mro__:
231 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
232 base
233 )
234 if manager:
235 return manager
236 return None
237
238
239def _as_declarative(
240 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
241) -> Optional[_MapperConfig]:
242 # declarative scans the class for attributes. no table or mapper
243 # args passed separately.
244 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
245
246
247def _mapper(
248 registry: _RegistryType,
249 cls: Type[_O],
250 table: Optional[FromClause],
251 mapper_kw: _MapperKwArgs,
252) -> Mapper[_O]:
253 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
254 return cast("MappedClassProtocol[_O]", cls).__mapper__
255
256
257@util.preload_module("sqlalchemy.orm.decl_api")
258def _is_declarative_props(obj: Any) -> bool:
259 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
260
261 return isinstance(obj, (_declared_attr_common, util.classproperty))
262
263
264def _check_declared_props_nocascade(
265 obj: Any, name: str, cls: Type[_O]
266) -> bool:
267 if _is_declarative_props(obj):
268 if getattr(obj, "_cascading", False):
269 util.warn(
270 "@declared_attr.cascading is not supported on the %s "
271 "attribute on class %s. This attribute invokes for "
272 "subclasses in any case." % (name, cls)
273 )
274 return True
275 else:
276 return False
277
278
279class _MapperConfig:
280 __slots__ = (
281 "cls",
282 "classname",
283 "properties",
284 "declared_attr_reg",
285 "__weakref__",
286 )
287
288 cls: Type[Any]
289 classname: str
290 properties: util.OrderedDict[
291 str,
292 Union[
293 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
294 ],
295 ]
296 declared_attr_reg: Dict[declared_attr[Any], Any]
297
298 @classmethod
299 def setup_mapping(
300 cls,
301 registry: _RegistryType,
302 cls_: Type[_O],
303 dict_: _ClassDict,
304 table: Optional[FromClause],
305 mapper_kw: _MapperKwArgs,
306 ) -> Optional[_MapperConfig]:
307 manager = attributes.opt_manager_of_class(cls)
308 if manager and manager.class_ is cls_:
309 raise exc.InvalidRequestError(
310 f"Class {cls!r} already has been instrumented declaratively"
311 )
312
313 if cls_.__dict__.get("__abstract__", False):
314 return None
315
316 defer_map = _get_immediate_cls_attr(
317 cls_, "_sa_decl_prepare_nocascade", strict=True
318 ) or hasattr(cls_, "_sa_decl_prepare")
319
320 if defer_map:
321 return _DeferredMapperConfig(
322 registry, cls_, dict_, table, mapper_kw
323 )
324 else:
325 return _ClassScanMapperConfig(
326 registry, cls_, dict_, table, mapper_kw
327 )
328
329 def __init__(
330 self,
331 registry: _RegistryType,
332 cls_: Type[Any],
333 mapper_kw: _MapperKwArgs,
334 ):
335 self.cls = util.assert_arg_type(cls_, type, "cls_")
336 self.classname = cls_.__name__
337 self.properties = util.OrderedDict()
338 self.declared_attr_reg = {}
339
340 if not mapper_kw.get("non_primary", False):
341 instrumentation.register_class(
342 self.cls,
343 finalize=False,
344 registry=registry,
345 declarative_scan=self,
346 init_method=registry.constructor,
347 )
348 else:
349 manager = attributes.opt_manager_of_class(self.cls)
350 if not manager or not manager.is_mapped:
351 raise exc.InvalidRequestError(
352 "Class %s has no primary mapper configured. Configure "
353 "a primary mapper first before setting up a non primary "
354 "Mapper." % self.cls
355 )
356
357 def set_cls_attribute(self, attrname: str, value: _T) -> _T:
358 manager = instrumentation.manager_of_class(self.cls)
359 manager.install_member(attrname, value)
360 return value
361
362 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
363 raise NotImplementedError()
364
365 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
366 self.map(mapper_kw)
367
368
369class _ImperativeMapperConfig(_MapperConfig):
370 __slots__ = ("local_table", "inherits")
371
372 def __init__(
373 self,
374 registry: _RegistryType,
375 cls_: Type[_O],
376 table: Optional[FromClause],
377 mapper_kw: _MapperKwArgs,
378 ):
379 super().__init__(registry, cls_, mapper_kw)
380
381 self.local_table = self.set_cls_attribute("__table__", table)
382
383 with mapperlib._CONFIGURE_MUTEX:
384 if not mapper_kw.get("non_primary", False):
385 clsregistry.add_class(
386 self.classname, self.cls, registry._class_registry
387 )
388
389 self._setup_inheritance(mapper_kw)
390
391 self._early_mapping(mapper_kw)
392
393 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
394 mapper_cls = Mapper
395
396 return self.set_cls_attribute(
397 "__mapper__",
398 mapper_cls(self.cls, self.local_table, **mapper_kw),
399 )
400
401 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
402 cls = self.cls
403
404 inherits = mapper_kw.get("inherits", None)
405
406 if inherits is None:
407 # since we search for classical mappings now, search for
408 # multiple mapped bases as well and raise an error.
409 inherits_search = []
410 for base_ in cls.__bases__:
411 c = _resolve_for_abstract_or_classical(base_)
412 if c is None:
413 continue
414
415 if _is_supercls_for_inherits(c) and c not in inherits_search:
416 inherits_search.append(c)
417
418 if inherits_search:
419 if len(inherits_search) > 1:
420 raise exc.InvalidRequestError(
421 "Class %s has multiple mapped bases: %r"
422 % (cls, inherits_search)
423 )
424 inherits = inherits_search[0]
425 elif isinstance(inherits, Mapper):
426 inherits = inherits.class_
427
428 self.inherits = inherits
429
430
431class _CollectedAnnotation(NamedTuple):
432 raw_annotation: _AnnotationScanType
433 mapped_container: Optional[Type[Mapped[Any]]]
434 extracted_mapped_annotation: Union[_AnnotationScanType, str]
435 is_dataclass: bool
436 attr_value: Any
437 originating_module: str
438 originating_class: Type[Any]
439
440
441class _ClassScanMapperConfig(_MapperConfig):
442 __slots__ = (
443 "registry",
444 "clsdict_view",
445 "collected_attributes",
446 "collected_annotations",
447 "local_table",
448 "persist_selectable",
449 "declared_columns",
450 "column_ordering",
451 "column_copies",
452 "table_args",
453 "tablename",
454 "mapper_args",
455 "mapper_args_fn",
456 "table_fn",
457 "inherits",
458 "single",
459 "allow_dataclass_fields",
460 "dataclass_setup_arguments",
461 "is_dataclass_prior_to_mapping",
462 "allow_unmapped_annotations",
463 )
464
465 is_deferred = False
466 registry: _RegistryType
467 clsdict_view: _ClassDict
468 collected_annotations: Dict[str, _CollectedAnnotation]
469 collected_attributes: Dict[str, Any]
470 local_table: Optional[FromClause]
471 persist_selectable: Optional[FromClause]
472 declared_columns: util.OrderedSet[Column[Any]]
473 column_ordering: Dict[Column[Any], int]
474 column_copies: Dict[
475 Union[MappedColumn[Any], Column[Any]],
476 Union[MappedColumn[Any], Column[Any]],
477 ]
478 tablename: Optional[str]
479 mapper_args: Mapping[str, Any]
480 table_args: Optional[_TableArgsType]
481 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
482 inherits: Optional[Type[Any]]
483 single: bool
484
485 is_dataclass_prior_to_mapping: bool
486 allow_unmapped_annotations: bool
487
488 dataclass_setup_arguments: Optional[_DataclassArguments]
489 """if the class has SQLAlchemy native dataclass parameters, where
490 we will turn the class into a dataclass within the declarative mapping
491 process.
492
493 """
494
495 allow_dataclass_fields: bool
496 """if true, look for dataclass-processed Field objects on the target
497 class as well as superclasses and extract ORM mapping directives from
498 the "metadata" attribute of each Field.
499
500 if False, dataclass fields can still be used, however they won't be
501 mapped.
502
503 """
504
505 def __init__(
506 self,
507 registry: _RegistryType,
508 cls_: Type[_O],
509 dict_: _ClassDict,
510 table: Optional[FromClause],
511 mapper_kw: _MapperKwArgs,
512 ):
513 # grab class dict before the instrumentation manager has been added.
514 # reduces cycles
515 self.clsdict_view = (
516 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
517 )
518 super().__init__(registry, cls_, mapper_kw)
519 self.registry = registry
520 self.persist_selectable = None
521
522 self.collected_attributes = {}
523 self.collected_annotations = {}
524 self.declared_columns = util.OrderedSet()
525 self.column_ordering = {}
526 self.column_copies = {}
527 self.single = False
528 self.dataclass_setup_arguments = dca = getattr(
529 self.cls, "_sa_apply_dc_transforms", None
530 )
531
532 self.allow_unmapped_annotations = getattr(
533 self.cls, "__allow_unmapped__", False
534 ) or bool(self.dataclass_setup_arguments)
535
536 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
537 cls_
538 )
539
540 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
541
542 # we don't want to consume Field objects from a not-already-dataclass.
543 # the Field objects won't have their "name" or "type" populated,
544 # and while it seems like we could just set these on Field as we
545 # read them, Field is documented as "user read only" and we need to
546 # stay far away from any off-label use of dataclasses APIs.
547 if (not cld or dca) and sdk:
548 raise exc.InvalidRequestError(
549 "SQLAlchemy mapped dataclasses can't consume mapping "
550 "information from dataclass.Field() objects if the immediate "
551 "class is not already a dataclass."
552 )
553
554 # if already a dataclass, and __sa_dataclass_metadata_key__ present,
555 # then also look inside of dataclass.Field() objects yielded by
556 # dataclasses.get_fields(cls) when scanning for attributes
557 self.allow_dataclass_fields = bool(sdk and cld)
558
559 self._setup_declared_events()
560
561 self._scan_attributes()
562
563 self._setup_dataclasses_transforms()
564
565 with mapperlib._CONFIGURE_MUTEX:
566 clsregistry.add_class(
567 self.classname, self.cls, registry._class_registry
568 )
569
570 self._setup_inheriting_mapper(mapper_kw)
571
572 self._extract_mappable_attributes()
573
574 self._extract_declared_columns()
575
576 self._setup_table(table)
577
578 self._setup_inheriting_columns(mapper_kw)
579
580 self._early_mapping(mapper_kw)
581
582 def _setup_declared_events(self) -> None:
583 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
584
585 @event.listens_for(Mapper, "after_configured")
586 def after_configured() -> None:
587 cast(
588 "_DeclMappedClassProtocol[Any]", self.cls
589 ).__declare_last__()
590
591 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
592
593 @event.listens_for(Mapper, "before_configured")
594 def before_configured() -> None:
595 cast(
596 "_DeclMappedClassProtocol[Any]", self.cls
597 ).__declare_first__()
598
599 def _cls_attr_override_checker(
600 self, cls: Type[_O]
601 ) -> Callable[[str, Any], bool]:
602 """Produce a function that checks if a class has overridden an
603 attribute, taking SQLAlchemy-enabled dataclass fields into account.
604
605 """
606
607 if self.allow_dataclass_fields:
608 sa_dataclass_metadata_key = _get_immediate_cls_attr(
609 cls, "__sa_dataclass_metadata_key__"
610 )
611 else:
612 sa_dataclass_metadata_key = None
613
614 if not sa_dataclass_metadata_key:
615
616 def attribute_is_overridden(key: str, obj: Any) -> bool:
617 return getattr(cls, key, obj) is not obj
618
619 else:
620 all_datacls_fields = {
621 f.name: f.metadata[sa_dataclass_metadata_key]
622 for f in util.dataclass_fields(cls)
623 if sa_dataclass_metadata_key in f.metadata
624 }
625 local_datacls_fields = {
626 f.name: f.metadata[sa_dataclass_metadata_key]
627 for f in util.local_dataclass_fields(cls)
628 if sa_dataclass_metadata_key in f.metadata
629 }
630
631 absent = object()
632
633 def attribute_is_overridden(key: str, obj: Any) -> bool:
634 if _is_declarative_props(obj):
635 obj = obj.fget
636
637 # this function likely has some failure modes still if
638 # someone is doing a deep mixing of the same attribute
639 # name as plain Python attribute vs. dataclass field.
640
641 ret = local_datacls_fields.get(key, absent)
642 if _is_declarative_props(ret):
643 ret = ret.fget
644
645 if ret is obj:
646 return False
647 elif ret is not absent:
648 return True
649
650 all_field = all_datacls_fields.get(key, absent)
651
652 ret = getattr(cls, key, obj)
653
654 if ret is obj:
655 return False
656
657 # for dataclasses, this could be the
658 # 'default' of the field. so filter more specifically
659 # for an already-mapped InstrumentedAttribute
660 if ret is not absent and isinstance(
661 ret, InstrumentedAttribute
662 ):
663 return True
664
665 if all_field is obj:
666 return False
667 elif all_field is not absent:
668 return True
669
670 # can't find another attribute
671 return False
672
673 return attribute_is_overridden
674
675 _include_dunders = {
676 "__table__",
677 "__mapper_args__",
678 "__tablename__",
679 "__table_args__",
680 }
681
682 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
683
684 def _cls_attr_resolver(
685 self, cls: Type[Any]
686 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
687 """produce a function to iterate the "attributes" of a class
688 which we want to consider for mapping, adjusting for SQLAlchemy fields
689 embedded in dataclass fields.
690
691 """
692 cls_annotations = util.get_annotations(cls)
693
694 cls_vars = vars(cls)
695
696 _include_dunders = self._include_dunders
697 _match_exclude_dunders = self._match_exclude_dunders
698
699 names = [
700 n
701 for n in util.merge_lists_w_ordering(
702 list(cls_vars), list(cls_annotations)
703 )
704 if not _match_exclude_dunders.match(n) or n in _include_dunders
705 ]
706
707 if self.allow_dataclass_fields:
708 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
709 cls, "__sa_dataclass_metadata_key__"
710 )
711 else:
712 sa_dataclass_metadata_key = None
713
714 if not sa_dataclass_metadata_key:
715
716 def local_attributes_for_class() -> (
717 Iterable[Tuple[str, Any, Any, bool]]
718 ):
719 return (
720 (
721 name,
722 cls_vars.get(name),
723 cls_annotations.get(name),
724 False,
725 )
726 for name in names
727 )
728
729 else:
730 dataclass_fields = {
731 field.name: field for field in util.local_dataclass_fields(cls)
732 }
733
734 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
735
736 def local_attributes_for_class() -> (
737 Iterable[Tuple[str, Any, Any, bool]]
738 ):
739 for name in names:
740 field = dataclass_fields.get(name, None)
741 if field and sa_dataclass_metadata_key in field.metadata:
742 yield field.name, _as_dc_declaredattr(
743 field.metadata, fixed_sa_dataclass_metadata_key
744 ), cls_annotations.get(field.name), True
745 else:
746 yield name, cls_vars.get(name), cls_annotations.get(
747 name
748 ), False
749
750 return local_attributes_for_class
751
752 def _scan_attributes(self) -> None:
753 cls = self.cls
754
755 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
756
757 clsdict_view = self.clsdict_view
758 collected_attributes = self.collected_attributes
759 column_copies = self.column_copies
760 _include_dunders = self._include_dunders
761 mapper_args_fn = None
762 table_args = inherited_table_args = None
763 table_fn = None
764 tablename = None
765 fixed_table = "__table__" in clsdict_view
766
767 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
768
769 bases = []
770
771 for base in cls.__mro__:
772 # collect bases and make sure standalone columns are copied
773 # to be the column they will ultimately be on the class,
774 # so that declared_attr functions use the right columns.
775 # need to do this all the way up the hierarchy first
776 # (see #8190)
777
778 class_mapped = base is not cls and _is_supercls_for_inherits(base)
779
780 local_attributes_for_class = self._cls_attr_resolver(base)
781
782 if not class_mapped and base is not cls:
783 locally_collected_columns = self._produce_column_copies(
784 local_attributes_for_class,
785 attribute_is_overridden,
786 fixed_table,
787 base,
788 )
789 else:
790 locally_collected_columns = {}
791
792 bases.append(
793 (
794 base,
795 class_mapped,
796 local_attributes_for_class,
797 locally_collected_columns,
798 )
799 )
800
801 for (
802 base,
803 class_mapped,
804 local_attributes_for_class,
805 locally_collected_columns,
806 ) in bases:
807 # this transfer can also take place as we scan each name
808 # for finer-grained control of how collected_attributes is
809 # populated, as this is what impacts column ordering.
810 # however it's simpler to get it out of the way here.
811 collected_attributes.update(locally_collected_columns)
812
813 for (
814 name,
815 obj,
816 annotation,
817 is_dataclass_field,
818 ) in local_attributes_for_class():
819 if name in _include_dunders:
820 if name == "__mapper_args__":
821 check_decl = _check_declared_props_nocascade(
822 obj, name, cls
823 )
824 if not mapper_args_fn and (
825 not class_mapped or check_decl
826 ):
827 # don't even invoke __mapper_args__ until
828 # after we've determined everything about the
829 # mapped table.
830 # make a copy of it so a class-level dictionary
831 # is not overwritten when we update column-based
832 # arguments.
833 def _mapper_args_fn() -> Dict[str, Any]:
834 return dict(cls_as_Decl.__mapper_args__)
835
836 mapper_args_fn = _mapper_args_fn
837
838 elif name == "__tablename__":
839 check_decl = _check_declared_props_nocascade(
840 obj, name, cls
841 )
842 if not tablename and (not class_mapped or check_decl):
843 tablename = cls_as_Decl.__tablename__
844 elif name == "__table__":
845 check_decl = _check_declared_props_nocascade(
846 obj, name, cls
847 )
848 # if a @declared_attr using "__table__" is detected,
849 # wrap up a callable to look for "__table__" from
850 # the final concrete class when we set up a table.
851 # this was fixed by
852 # #11509, regression in 2.0 from version 1.4.
853 if check_decl and not table_fn:
854 # don't even invoke __table__ until we're ready
855 def _table_fn() -> FromClause:
856 return cls_as_Decl.__table__
857
858 table_fn = _table_fn
859
860 elif name == "__table_args__":
861 check_decl = _check_declared_props_nocascade(
862 obj, name, cls
863 )
864 if not table_args and (not class_mapped or check_decl):
865 table_args = cls_as_Decl.__table_args__
866 if not isinstance(
867 table_args, (tuple, dict, type(None))
868 ):
869 raise exc.ArgumentError(
870 "__table_args__ value must be a tuple, "
871 "dict, or None"
872 )
873 if base is not cls:
874 inherited_table_args = True
875 else:
876 # any other dunder names; should not be here
877 # as we have tested for all four names in
878 # _include_dunders
879 assert False
880 elif class_mapped:
881 if _is_declarative_props(obj) and not obj._quiet:
882 util.warn(
883 "Regular (i.e. not __special__) "
884 "attribute '%s.%s' uses @declared_attr, "
885 "but owning class %s is mapped - "
886 "not applying to subclass %s."
887 % (base.__name__, name, base, cls)
888 )
889
890 continue
891 elif base is not cls:
892 # we're a mixin, abstract base, or something that is
893 # acting like that for now.
894
895 if isinstance(obj, (Column, MappedColumn)):
896 # already copied columns to the mapped class.
897 continue
898 elif isinstance(obj, MapperProperty):
899 raise exc.InvalidRequestError(
900 "Mapper properties (i.e. deferred,"
901 "column_property(), relationship(), etc.) must "
902 "be declared as @declared_attr callables "
903 "on declarative mixin classes. For dataclass "
904 "field() objects, use a lambda:"
905 )
906 elif _is_declarative_props(obj):
907 # tried to get overloads to tell this to
908 # pylance, no luck
909 assert obj is not None
910
911 if obj._cascading:
912 if name in clsdict_view:
913 # unfortunately, while we can use the user-
914 # defined attribute here to allow a clean
915 # override, if there's another
916 # subclass below then it still tries to use
917 # this. not sure if there is enough
918 # information here to add this as a feature
919 # later on.
920 util.warn(
921 "Attribute '%s' on class %s cannot be "
922 "processed due to "
923 "@declared_attr.cascading; "
924 "skipping" % (name, cls)
925 )
926 collected_attributes[name] = column_copies[obj] = (
927 ret
928 ) = obj.__get__(obj, cls)
929 setattr(cls, name, ret)
930 else:
931 if is_dataclass_field:
932 # access attribute using normal class access
933 # first, to see if it's been mapped on a
934 # superclass. note if the dataclasses.field()
935 # has "default", this value can be anything.
936 ret = getattr(cls, name, None)
937
938 # so, if it's anything that's not ORM
939 # mapped, assume we should invoke the
940 # declared_attr
941 if not isinstance(ret, InspectionAttr):
942 ret = obj.fget()
943 else:
944 # access attribute using normal class access.
945 # if the declared attr already took place
946 # on a superclass that is mapped, then
947 # this is no longer a declared_attr, it will
948 # be the InstrumentedAttribute
949 ret = getattr(cls, name)
950
951 # correct for proxies created from hybrid_property
952 # or similar. note there is no known case that
953 # produces nested proxies, so we are only
954 # looking one level deep right now.
955
956 if (
957 isinstance(ret, InspectionAttr)
958 and attr_is_internal_proxy(ret)
959 and not isinstance(
960 ret.original_property, MapperProperty
961 )
962 ):
963 ret = ret.descriptor
964
965 collected_attributes[name] = column_copies[obj] = (
966 ret
967 )
968
969 if (
970 isinstance(ret, (Column, MapperProperty))
971 and ret.doc is None
972 ):
973 ret.doc = obj.__doc__
974
975 self._collect_annotation(
976 name,
977 obj._collect_return_annotation(),
978 base,
979 True,
980 obj,
981 )
982 elif _is_mapped_annotation(annotation, cls, base):
983 # Mapped annotation without any object.
984 # product_column_copies should have handled this.
985 # if future support for other MapperProperty,
986 # then test if this name is already handled and
987 # otherwise proceed to generate.
988 if not fixed_table:
989 assert (
990 name in collected_attributes
991 or attribute_is_overridden(name, None)
992 )
993 continue
994 else:
995 # here, the attribute is some other kind of
996 # property that we assume is not part of the
997 # declarative mapping. however, check for some
998 # more common mistakes
999 self._warn_for_decl_attributes(base, name, obj)
1000 elif is_dataclass_field and (
1001 name not in clsdict_view or clsdict_view[name] is not obj
1002 ):
1003 # here, we are definitely looking at the target class
1004 # and not a superclass. this is currently a
1005 # dataclass-only path. if the name is only
1006 # a dataclass field and isn't in local cls.__dict__,
1007 # put the object there.
1008 # assert that the dataclass-enabled resolver agrees
1009 # with what we are seeing
1010
1011 assert not attribute_is_overridden(name, obj)
1012
1013 if _is_declarative_props(obj):
1014 obj = obj.fget()
1015
1016 collected_attributes[name] = obj
1017 self._collect_annotation(
1018 name, annotation, base, False, obj
1019 )
1020 else:
1021 collected_annotation = self._collect_annotation(
1022 name, annotation, base, None, obj
1023 )
1024 is_mapped = (
1025 collected_annotation is not None
1026 and collected_annotation.mapped_container is not None
1027 )
1028 generated_obj = (
1029 collected_annotation.attr_value
1030 if collected_annotation is not None
1031 else obj
1032 )
1033 if obj is None and not fixed_table and is_mapped:
1034 collected_attributes[name] = (
1035 generated_obj
1036 if generated_obj is not None
1037 else MappedColumn()
1038 )
1039 elif name in clsdict_view:
1040 collected_attributes[name] = obj
1041 # else if the name is not in the cls.__dict__,
1042 # don't collect it as an attribute.
1043 # we will see the annotation only, which is meaningful
1044 # both for mapping and dataclasses setup
1045
1046 if inherited_table_args and not tablename:
1047 table_args = None
1048
1049 self.table_args = table_args
1050 self.tablename = tablename
1051 self.mapper_args_fn = mapper_args_fn
1052 self.table_fn = table_fn
1053
1054 def _setup_dataclasses_transforms(self) -> None:
1055 dataclass_setup_arguments = self.dataclass_setup_arguments
1056 if not dataclass_setup_arguments:
1057 return
1058
1059 # can't use is_dataclass since it uses hasattr
1060 if "__dataclass_fields__" in self.cls.__dict__:
1061 raise exc.InvalidRequestError(
1062 f"Class {self.cls} is already a dataclass; ensure that "
1063 "base classes / decorator styles of establishing dataclasses "
1064 "are not being mixed. "
1065 "This can happen if a class that inherits from "
1066 "'MappedAsDataclass', even indirectly, is been mapped with "
1067 "'@registry.mapped_as_dataclass'"
1068 )
1069
1070 warn_for_non_dc_attrs = collections.defaultdict(list)
1071
1072 def _allow_dataclass_field(
1073 key: str, originating_class: Type[Any]
1074 ) -> bool:
1075 if (
1076 originating_class is not self.cls
1077 and "__dataclass_fields__" not in originating_class.__dict__
1078 ):
1079 warn_for_non_dc_attrs[originating_class].append(key)
1080
1081 return True
1082
1083 manager = instrumentation.manager_of_class(self.cls)
1084 assert manager is not None
1085
1086 field_list = [
1087 _AttributeOptions._get_arguments_for_make_dataclass(
1088 key,
1089 anno,
1090 mapped_container,
1091 self.collected_attributes.get(key, _NoArg.NO_ARG),
1092 )
1093 for key, anno, mapped_container in (
1094 (
1095 key,
1096 mapped_anno if mapped_anno else raw_anno,
1097 mapped_container,
1098 )
1099 for key, (
1100 raw_anno,
1101 mapped_container,
1102 mapped_anno,
1103 is_dc,
1104 attr_value,
1105 originating_module,
1106 originating_class,
1107 ) in self.collected_annotations.items()
1108 if _allow_dataclass_field(key, originating_class)
1109 and (
1110 key not in self.collected_attributes
1111 # issue #9226; check for attributes that we've collected
1112 # which are already instrumented, which we would assume
1113 # mean we are in an ORM inheritance mapping and this
1114 # attribute is already mapped on the superclass. Under
1115 # no circumstance should any QueryableAttribute be sent to
1116 # the dataclass() function; anything that's mapped should
1117 # be Field and that's it
1118 or not isinstance(
1119 self.collected_attributes[key], QueryableAttribute
1120 )
1121 )
1122 )
1123 ]
1124
1125 if warn_for_non_dc_attrs:
1126 for (
1127 originating_class,
1128 non_dc_attrs,
1129 ) in warn_for_non_dc_attrs.items():
1130 util.warn_deprecated(
1131 f"When transforming {self.cls} to a dataclass, "
1132 f"attribute(s) "
1133 f"{', '.join(repr(key) for key in non_dc_attrs)} "
1134 f"originates from superclass "
1135 f"{originating_class}, which is not a dataclass. This "
1136 f"usage is deprecated and will raise an error in "
1137 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
1138 f"Dataclasses, ensure that all mixin classes and other "
1139 f"superclasses which include attributes are also a "
1140 f"subclass of MappedAsDataclass.",
1141 "2.0",
1142 code="dcmx",
1143 )
1144
1145 annotations = {}
1146 defaults = {}
1147 for item in field_list:
1148 if len(item) == 2:
1149 name, tp = item
1150 elif len(item) == 3:
1151 name, tp, spec = item
1152 defaults[name] = spec
1153 else:
1154 assert False
1155 annotations[name] = tp
1156
1157 for k, v in defaults.items():
1158 setattr(self.cls, k, v)
1159
1160 self._apply_dataclasses_to_any_class(
1161 dataclass_setup_arguments, self.cls, annotations
1162 )
1163
1164 @classmethod
1165 def _update_annotations_for_non_mapped_class(
1166 cls, klass: Type[_O]
1167 ) -> Mapping[str, _AnnotationScanType]:
1168 cls_annotations = util.get_annotations(klass)
1169
1170 new_anno = {}
1171 for name, annotation in cls_annotations.items():
1172 if _is_mapped_annotation(annotation, klass, klass):
1173 extracted = _extract_mapped_subtype(
1174 annotation,
1175 klass,
1176 klass.__module__,
1177 name,
1178 type(None),
1179 required=False,
1180 is_dataclass_field=False,
1181 expect_mapped=False,
1182 )
1183 if extracted:
1184 inner, _ = extracted
1185 new_anno[name] = inner
1186 else:
1187 new_anno[name] = annotation
1188 return new_anno
1189
1190 @classmethod
1191 def _apply_dataclasses_to_any_class(
1192 cls,
1193 dataclass_setup_arguments: _DataclassArguments,
1194 klass: Type[_O],
1195 use_annotations: Mapping[str, _AnnotationScanType],
1196 ) -> None:
1197 cls._assert_dc_arguments(dataclass_setup_arguments)
1198
1199 dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
1200 if dataclass_callable is _NoArg.NO_ARG:
1201 dataclass_callable = dataclasses.dataclass
1202
1203 restored: Optional[Any]
1204
1205 if use_annotations:
1206 # apply constructed annotations that should look "normal" to a
1207 # dataclasses callable, based on the fields present. This
1208 # means remove the Mapped[] container and ensure all Field
1209 # entries have an annotation
1210 restored = getattr(klass, "__annotations__", None)
1211 klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
1212 else:
1213 restored = None
1214
1215 try:
1216 dataclass_callable(
1217 klass,
1218 **{
1219 k: v
1220 for k, v in dataclass_setup_arguments.items()
1221 if v is not _NoArg.NO_ARG and k != "dataclass_callable"
1222 },
1223 )
1224 except (TypeError, ValueError) as ex:
1225 raise exc.InvalidRequestError(
1226 f"Python dataclasses error encountered when creating "
1227 f"dataclass for {klass.__name__!r}: "
1228 f"{ex!r}. Please refer to Python dataclasses "
1229 "documentation for additional information.",
1230 code="dcte",
1231 ) from ex
1232 finally:
1233 # restore original annotations outside of the dataclasses
1234 # process; for mixins and __abstract__ superclasses, SQLAlchemy
1235 # Declarative will need to see the Mapped[] container inside the
1236 # annotations in order to map subclasses
1237 if use_annotations:
1238 if restored is None:
1239 del klass.__annotations__
1240 else:
1241 klass.__annotations__ = restored
1242
1243 @classmethod
1244 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
1245 allowed = {
1246 "init",
1247 "repr",
1248 "order",
1249 "eq",
1250 "unsafe_hash",
1251 "kw_only",
1252 "match_args",
1253 "dataclass_callable",
1254 }
1255 disallowed_args = set(arguments).difference(allowed)
1256 if disallowed_args:
1257 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
1258 raise exc.ArgumentError(
1259 f"Dataclass argument(s) {msg} are not accepted"
1260 )
1261
1262 def _collect_annotation(
1263 self,
1264 name: str,
1265 raw_annotation: _AnnotationScanType,
1266 originating_class: Type[Any],
1267 expect_mapped: Optional[bool],
1268 attr_value: Any,
1269 ) -> Optional[_CollectedAnnotation]:
1270 if name in self.collected_annotations:
1271 return self.collected_annotations[name]
1272
1273 if raw_annotation is None:
1274 return None
1275
1276 is_dataclass = self.is_dataclass_prior_to_mapping
1277 allow_unmapped = self.allow_unmapped_annotations
1278
1279 if expect_mapped is None:
1280 is_dataclass_field = isinstance(attr_value, dataclasses.Field)
1281 expect_mapped = (
1282 not is_dataclass_field
1283 and not allow_unmapped
1284 and (
1285 attr_value is None
1286 or isinstance(attr_value, _MappedAttribute)
1287 )
1288 )
1289 else:
1290 is_dataclass_field = False
1291
1292 is_dataclass_field = False
1293 extracted = _extract_mapped_subtype(
1294 raw_annotation,
1295 self.cls,
1296 originating_class.__module__,
1297 name,
1298 type(attr_value),
1299 required=False,
1300 is_dataclass_field=is_dataclass_field,
1301 expect_mapped=expect_mapped
1302 and not is_dataclass, # self.allow_dataclass_fields,
1303 )
1304
1305 if extracted is None:
1306 # ClassVar can come out here
1307 return None
1308
1309 extracted_mapped_annotation, mapped_container = extracted
1310
1311 if attr_value is None and not is_literal(extracted_mapped_annotation):
1312 for elem in typing_get_args(extracted_mapped_annotation):
1313 if isinstance(elem, str) or is_fwd_ref(
1314 elem, check_generic=True
1315 ):
1316 elem = de_stringify_annotation(
1317 self.cls,
1318 elem,
1319 originating_class.__module__,
1320 include_generic=True,
1321 )
1322 # look in Annotated[...] for an ORM construct,
1323 # such as Annotated[int, mapped_column(primary_key=True)]
1324 if isinstance(elem, _IntrospectsAnnotations):
1325 attr_value = elem.found_in_pep593_annotated()
1326
1327 self.collected_annotations[name] = ca = _CollectedAnnotation(
1328 raw_annotation,
1329 mapped_container,
1330 extracted_mapped_annotation,
1331 is_dataclass,
1332 attr_value,
1333 originating_class.__module__,
1334 originating_class,
1335 )
1336 return ca
1337
1338 def _warn_for_decl_attributes(
1339 self, cls: Type[Any], key: str, c: Any
1340 ) -> None:
1341 if isinstance(c, expression.ColumnElement):
1342 util.warn(
1343 f"Attribute '{key}' on class {cls} appears to "
1344 "be a non-schema SQLAlchemy expression "
1345 "object; this won't be part of the declarative mapping. "
1346 "To map arbitrary expressions, use ``column_property()`` "
1347 "or a similar function such as ``deferred()``, "
1348 "``query_expression()`` etc. "
1349 )
1350
1351 def _produce_column_copies(
1352 self,
1353 attributes_for_class: Callable[
1354 [], Iterable[Tuple[str, Any, Any, bool]]
1355 ],
1356 attribute_is_overridden: Callable[[str, Any], bool],
1357 fixed_table: bool,
1358 originating_class: Type[Any],
1359 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
1360 cls = self.cls
1361 dict_ = self.clsdict_view
1362 locally_collected_attributes = {}
1363 column_copies = self.column_copies
1364 # copy mixin columns to the mapped class
1365
1366 for name, obj, annotation, is_dataclass in attributes_for_class():
1367 if (
1368 not fixed_table
1369 and obj is None
1370 and _is_mapped_annotation(annotation, cls, originating_class)
1371 ):
1372 # obj is None means this is the annotation only path
1373
1374 if attribute_is_overridden(name, obj):
1375 # perform same "overridden" check as we do for
1376 # Column/MappedColumn, this is how a mixin col is not
1377 # applied to an inherited subclass that does not have
1378 # the mixin. the anno-only path added here for
1379 # #9564
1380 continue
1381
1382 collected_annotation = self._collect_annotation(
1383 name, annotation, originating_class, True, obj
1384 )
1385 obj = (
1386 collected_annotation.attr_value
1387 if collected_annotation is not None
1388 else obj
1389 )
1390 if obj is None:
1391 obj = MappedColumn()
1392
1393 locally_collected_attributes[name] = obj
1394 setattr(cls, name, obj)
1395
1396 elif isinstance(obj, (Column, MappedColumn)):
1397 if attribute_is_overridden(name, obj):
1398 # if column has been overridden
1399 # (like by the InstrumentedAttribute of the
1400 # superclass), skip. don't collect the annotation
1401 # either (issue #8718)
1402 continue
1403
1404 collected_annotation = self._collect_annotation(
1405 name, annotation, originating_class, True, obj
1406 )
1407 obj = (
1408 collected_annotation.attr_value
1409 if collected_annotation is not None
1410 else obj
1411 )
1412
1413 if name not in dict_ and not (
1414 "__table__" in dict_
1415 and (getattr(obj, "name", None) or name)
1416 in dict_["__table__"].c
1417 ):
1418 if obj.foreign_keys:
1419 for fk in obj.foreign_keys:
1420 if (
1421 fk._table_column is not None
1422 and fk._table_column.table is None
1423 ):
1424 raise exc.InvalidRequestError(
1425 "Columns with foreign keys to "
1426 "non-table-bound "
1427 "columns must be declared as "
1428 "@declared_attr callables "
1429 "on declarative mixin classes. "
1430 "For dataclass "
1431 "field() objects, use a lambda:."
1432 )
1433
1434 column_copies[obj] = copy_ = obj._copy()
1435
1436 locally_collected_attributes[name] = copy_
1437 setattr(cls, name, copy_)
1438
1439 return locally_collected_attributes
1440
1441 def _extract_mappable_attributes(self) -> None:
1442 cls = self.cls
1443 collected_attributes = self.collected_attributes
1444
1445 our_stuff = self.properties
1446
1447 _include_dunders = self._include_dunders
1448
1449 late_mapped = _get_immediate_cls_attr(
1450 cls, "_sa_decl_prepare_nocascade", strict=True
1451 )
1452
1453 allow_unmapped_annotations = self.allow_unmapped_annotations
1454 expect_annotations_wo_mapped = (
1455 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
1456 )
1457
1458 look_for_dataclass_things = bool(self.dataclass_setup_arguments)
1459
1460 for k in list(collected_attributes):
1461 if k in _include_dunders:
1462 continue
1463
1464 value = collected_attributes[k]
1465
1466 if _is_declarative_props(value):
1467 # @declared_attr in collected_attributes only occurs here for a
1468 # @declared_attr that's directly on the mapped class;
1469 # for a mixin, these have already been evaluated
1470 if value._cascading:
1471 util.warn(
1472 "Use of @declared_attr.cascading only applies to "
1473 "Declarative 'mixin' and 'abstract' classes. "
1474 "Currently, this flag is ignored on mapped class "
1475 "%s" % self.cls
1476 )
1477
1478 value = getattr(cls, k)
1479
1480 elif (
1481 isinstance(value, QueryableAttribute)
1482 and value.class_ is not cls
1483 and value.key != k
1484 ):
1485 # detect a QueryableAttribute that's already mapped being
1486 # assigned elsewhere in userland, turn into a synonym()
1487 value = SynonymProperty(value.key)
1488 setattr(cls, k, value)
1489
1490 if (
1491 isinstance(value, tuple)
1492 and len(value) == 1
1493 and isinstance(value[0], (Column, _MappedAttribute))
1494 ):
1495 util.warn(
1496 "Ignoring declarative-like tuple value of attribute "
1497 "'%s': possibly a copy-and-paste error with a comma "
1498 "accidentally placed at the end of the line?" % k
1499 )
1500 continue
1501 elif look_for_dataclass_things and isinstance(
1502 value, dataclasses.Field
1503 ):
1504 # we collected a dataclass Field; dataclasses would have
1505 # set up the correct state on the class
1506 continue
1507 elif not isinstance(value, (Column, _DCAttributeOptions)):
1508 # using @declared_attr for some object that
1509 # isn't Column/MapperProperty/_DCAttributeOptions; remove
1510 # from the clsdict_view
1511 # and place the evaluated value onto the class.
1512 collected_attributes.pop(k)
1513 self._warn_for_decl_attributes(cls, k, value)
1514 if not late_mapped:
1515 setattr(cls, k, value)
1516 continue
1517 # we expect to see the name 'metadata' in some valid cases;
1518 # however at this point we see it's assigned to something trying
1519 # to be mapped, so raise for that.
1520 # TODO: should "registry" here be also? might be too late
1521 # to change that now (2.0 betas)
1522 elif k in ("metadata",):
1523 raise exc.InvalidRequestError(
1524 f"Attribute name '{k}' is reserved when using the "
1525 "Declarative API."
1526 )
1527 elif isinstance(value, Column):
1528 _undefer_column_name(
1529 k, self.column_copies.get(value, value) # type: ignore
1530 )
1531 else:
1532 if isinstance(value, _IntrospectsAnnotations):
1533 (
1534 annotation,
1535 mapped_container,
1536 extracted_mapped_annotation,
1537 is_dataclass,
1538 attr_value,
1539 originating_module,
1540 originating_class,
1541 ) = self.collected_annotations.get(
1542 k, (None, None, None, False, None, None, None)
1543 )
1544
1545 # issue #8692 - don't do any annotation interpretation if
1546 # an annotation were present and a container such as
1547 # Mapped[] etc. were not used. If annotation is None,
1548 # do declarative_scan so that the property can raise
1549 # for required
1550 if (
1551 mapped_container is not None
1552 or annotation is None
1553 # issue #10516: need to do declarative_scan even with
1554 # a non-Mapped annotation if we are doing
1555 # __allow_unmapped__, for things like col.name
1556 # assignment
1557 or allow_unmapped_annotations
1558 ):
1559 try:
1560 value.declarative_scan(
1561 self,
1562 self.registry,
1563 cls,
1564 originating_module,
1565 k,
1566 mapped_container,
1567 annotation,
1568 extracted_mapped_annotation,
1569 is_dataclass,
1570 )
1571 except NameError as ne:
1572 raise exc.ArgumentError(
1573 f"Could not resolve all types within mapped "
1574 f'annotation: "{annotation}". Ensure all '
1575 f"types are written correctly and are "
1576 f"imported within the module in use."
1577 ) from ne
1578 else:
1579 # assert that we were expecting annotations
1580 # without Mapped[] were going to be passed.
1581 # otherwise an error should have been raised
1582 # by util._extract_mapped_subtype before we got here.
1583 assert expect_annotations_wo_mapped
1584
1585 if isinstance(value, _DCAttributeOptions):
1586 if (
1587 value._has_dataclass_arguments
1588 and not look_for_dataclass_things
1589 ):
1590 if isinstance(value, MapperProperty):
1591 argnames = [
1592 "init",
1593 "default_factory",
1594 "repr",
1595 "default",
1596 ]
1597 else:
1598 argnames = ["init", "default_factory", "repr"]
1599
1600 args = {
1601 a
1602 for a in argnames
1603 if getattr(
1604 value._attribute_options, f"dataclasses_{a}"
1605 )
1606 is not _NoArg.NO_ARG
1607 }
1608
1609 raise exc.ArgumentError(
1610 f"Attribute '{k}' on class {cls} includes "
1611 f"dataclasses argument(s): "
1612 f"{', '.join(sorted(repr(a) for a in args))} but "
1613 f"class does not specify "
1614 "SQLAlchemy native dataclass configuration."
1615 )
1616
1617 if not isinstance(value, (MapperProperty, _MapsColumns)):
1618 # filter for _DCAttributeOptions objects that aren't
1619 # MapperProperty / mapped_column(). Currently this
1620 # includes AssociationProxy. pop it from the things
1621 # we're going to map and set it up as a descriptor
1622 # on the class.
1623 collected_attributes.pop(k)
1624
1625 # Assoc Prox (or other descriptor object that may
1626 # use _DCAttributeOptions) is usually here, except if
1627 # 1. we're a
1628 # dataclass, dataclasses would have removed the
1629 # attr here or 2. assoc proxy is coming from a
1630 # superclass, we want it to be direct here so it
1631 # tracks state or 3. assoc prox comes from
1632 # declared_attr, uncommon case
1633 setattr(cls, k, value)
1634 continue
1635
1636 our_stuff[k] = value
1637
1638 def _extract_declared_columns(self) -> None:
1639 our_stuff = self.properties
1640
1641 # extract columns from the class dict
1642 declared_columns = self.declared_columns
1643 column_ordering = self.column_ordering
1644 name_to_prop_key = collections.defaultdict(set)
1645
1646 for key, c in list(our_stuff.items()):
1647 if isinstance(c, _MapsColumns):
1648 mp_to_assign = c.mapper_property_to_assign
1649 if mp_to_assign:
1650 our_stuff[key] = mp_to_assign
1651 else:
1652 # if no mapper property to assign, this currently means
1653 # this is a MappedColumn that will produce a Column for us
1654 del our_stuff[key]
1655
1656 for col, sort_order in c.columns_to_assign:
1657 if not isinstance(c, CompositeProperty):
1658 name_to_prop_key[col.name].add(key)
1659 declared_columns.add(col)
1660
1661 # we would assert this, however we want the below
1662 # warning to take effect instead. See #9630
1663 # assert col not in column_ordering
1664
1665 column_ordering[col] = sort_order
1666
1667 # if this is a MappedColumn and the attribute key we
1668 # have is not what the column has for its key, map the
1669 # Column explicitly under the attribute key name.
1670 # otherwise, Mapper will map it under the column key.
1671 if mp_to_assign is None and key != col.key:
1672 our_stuff[key] = col
1673 elif isinstance(c, Column):
1674 # undefer previously occurred here, and now occurs earlier.
1675 # ensure every column we get here has been named
1676 assert c.name is not None
1677 name_to_prop_key[c.name].add(key)
1678 declared_columns.add(c)
1679 # if the column is the same name as the key,
1680 # remove it from the explicit properties dict.
1681 # the normal rules for assigning column-based properties
1682 # will take over, including precedence of columns
1683 # in multi-column ColumnProperties.
1684 if key == c.key:
1685 del our_stuff[key]
1686
1687 for name, keys in name_to_prop_key.items():
1688 if len(keys) > 1:
1689 util.warn(
1690 "On class %r, Column object %r named "
1691 "directly multiple times, "
1692 "only one will be used: %s. "
1693 "Consider using orm.synonym instead"
1694 % (self.classname, name, (", ".join(sorted(keys))))
1695 )
1696
1697 def _setup_table(self, table: Optional[FromClause] = None) -> None:
1698 cls = self.cls
1699 cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
1700
1701 tablename = self.tablename
1702 table_args = self.table_args
1703 clsdict_view = self.clsdict_view
1704 declared_columns = self.declared_columns
1705 column_ordering = self.column_ordering
1706
1707 manager = attributes.manager_of_class(cls)
1708
1709 if (
1710 self.table_fn is None
1711 and "__table__" not in clsdict_view
1712 and table is None
1713 ):
1714 if hasattr(cls, "__table_cls__"):
1715 table_cls = cast(
1716 Type[Table],
1717 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
1718 )
1719 else:
1720 table_cls = Table
1721
1722 if tablename is not None:
1723 args: Tuple[Any, ...] = ()
1724 table_kw: Dict[str, Any] = {}
1725
1726 if table_args:
1727 if isinstance(table_args, dict):
1728 table_kw = table_args
1729 elif isinstance(table_args, tuple):
1730 if isinstance(table_args[-1], dict):
1731 args, table_kw = table_args[0:-1], table_args[-1]
1732 else:
1733 args = table_args
1734
1735 autoload_with = clsdict_view.get("__autoload_with__")
1736 if autoload_with:
1737 table_kw["autoload_with"] = autoload_with
1738
1739 autoload = clsdict_view.get("__autoload__")
1740 if autoload:
1741 table_kw["autoload"] = True
1742
1743 sorted_columns = sorted(
1744 declared_columns,
1745 key=lambda c: column_ordering.get(c, 0),
1746 )
1747 table = self.set_cls_attribute(
1748 "__table__",
1749 table_cls(
1750 tablename,
1751 self._metadata_for_cls(manager),
1752 *sorted_columns,
1753 *args,
1754 **table_kw,
1755 ),
1756 )
1757 else:
1758 if table is None:
1759 if self.table_fn:
1760 table = self.set_cls_attribute(
1761 "__table__", self.table_fn()
1762 )
1763 else:
1764 table = cls_as_Decl.__table__
1765 if declared_columns:
1766 for c in declared_columns:
1767 if not table.c.contains_column(c):
1768 raise exc.ArgumentError(
1769 "Can't add additional column %r when "
1770 "specifying __table__" % c.key
1771 )
1772
1773 self.local_table = table
1774
1775 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
1776 meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
1777 if meta is not None:
1778 return meta
1779 else:
1780 return manager.registry.metadata
1781
1782 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
1783 cls = self.cls
1784
1785 inherits = mapper_kw.get("inherits", None)
1786
1787 if inherits is None:
1788 # since we search for classical mappings now, search for
1789 # multiple mapped bases as well and raise an error.
1790 inherits_search = []
1791 for base_ in cls.__bases__:
1792 c = _resolve_for_abstract_or_classical(base_)
1793 if c is None:
1794 continue
1795
1796 if _is_supercls_for_inherits(c) and c not in inherits_search:
1797 inherits_search.append(c)
1798
1799 if inherits_search:
1800 if len(inherits_search) > 1:
1801 raise exc.InvalidRequestError(
1802 "Class %s has multiple mapped bases: %r"
1803 % (cls, inherits_search)
1804 )
1805 inherits = inherits_search[0]
1806 elif isinstance(inherits, Mapper):
1807 inherits = inherits.class_
1808
1809 self.inherits = inherits
1810
1811 clsdict_view = self.clsdict_view
1812 if "__table__" not in clsdict_view and self.tablename is None:
1813 self.single = True
1814
1815 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
1816 table = self.local_table
1817 cls = self.cls
1818 table_args = self.table_args
1819 declared_columns = self.declared_columns
1820
1821 if (
1822 table is None
1823 and self.inherits is None
1824 and not _get_immediate_cls_attr(cls, "__no_table__")
1825 ):
1826 raise exc.InvalidRequestError(
1827 "Class %r does not have a __table__ or __tablename__ "
1828 "specified and does not inherit from an existing "
1829 "table-mapped class." % cls
1830 )
1831 elif self.inherits:
1832 inherited_mapper_or_config = _declared_mapping_info(self.inherits)
1833 assert inherited_mapper_or_config is not None
1834 inherited_table = inherited_mapper_or_config.local_table
1835 inherited_persist_selectable = (
1836 inherited_mapper_or_config.persist_selectable
1837 )
1838
1839 if table is None:
1840 # single table inheritance.
1841 # ensure no table args
1842 if table_args:
1843 raise exc.ArgumentError(
1844 "Can't place __table_args__ on an inherited class "
1845 "with no table."
1846 )
1847
1848 # add any columns declared here to the inherited table.
1849 if declared_columns and not isinstance(inherited_table, Table):
1850 raise exc.ArgumentError(
1851 f"Can't declare columns on single-table-inherited "
1852 f"subclass {self.cls}; superclass {self.inherits} "
1853 "is not mapped to a Table"
1854 )
1855
1856 for col in declared_columns:
1857 assert inherited_table is not None
1858 if col.name in inherited_table.c:
1859 if inherited_table.c[col.name] is col:
1860 continue
1861 raise exc.ArgumentError(
1862 f"Column '{col}' on class {cls.__name__} "
1863 f"conflicts with existing column "
1864 f"'{inherited_table.c[col.name]}'. If using "
1865 f"Declarative, consider using the "
1866 "use_existing_column parameter of mapped_column() "
1867 "to resolve conflicts."
1868 )
1869 if col.primary_key:
1870 raise exc.ArgumentError(
1871 "Can't place primary key columns on an inherited "
1872 "class with no table."
1873 )
1874
1875 if TYPE_CHECKING:
1876 assert isinstance(inherited_table, Table)
1877
1878 inherited_table.append_column(col)
1879 if (
1880 inherited_persist_selectable is not None
1881 and inherited_persist_selectable is not inherited_table
1882 ):
1883 inherited_persist_selectable._refresh_for_new_column(
1884 col
1885 )
1886
1887 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
1888 properties = self.properties
1889
1890 if self.mapper_args_fn:
1891 mapper_args = self.mapper_args_fn()
1892 else:
1893 mapper_args = {}
1894
1895 if mapper_kw:
1896 mapper_args.update(mapper_kw)
1897
1898 if "properties" in mapper_args:
1899 properties = dict(properties)
1900 properties.update(mapper_args["properties"])
1901
1902 # make sure that column copies are used rather
1903 # than the original columns from any mixins
1904 for k in ("version_id_col", "polymorphic_on"):
1905 if k in mapper_args:
1906 v = mapper_args[k]
1907 mapper_args[k] = self.column_copies.get(v, v)
1908
1909 if "primary_key" in mapper_args:
1910 mapper_args["primary_key"] = [
1911 self.column_copies.get(v, v)
1912 for v in util.to_list(mapper_args["primary_key"])
1913 ]
1914
1915 if "inherits" in mapper_args:
1916 inherits_arg = mapper_args["inherits"]
1917 if isinstance(inherits_arg, Mapper):
1918 inherits_arg = inherits_arg.class_
1919
1920 if inherits_arg is not self.inherits:
1921 raise exc.InvalidRequestError(
1922 "mapper inherits argument given for non-inheriting "
1923 "class %s" % (mapper_args["inherits"])
1924 )
1925
1926 if self.inherits:
1927 mapper_args["inherits"] = self.inherits
1928
1929 if self.inherits and not mapper_args.get("concrete", False):
1930 # note the superclass is expected to have a Mapper assigned and
1931 # not be a deferred config, as this is called within map()
1932 inherited_mapper = class_mapper(self.inherits, False)
1933 inherited_table = inherited_mapper.local_table
1934
1935 # single or joined inheritance
1936 # exclude any cols on the inherited table which are
1937 # not mapped on the parent class, to avoid
1938 # mapping columns specific to sibling/nephew classes
1939 if "exclude_properties" not in mapper_args:
1940 mapper_args["exclude_properties"] = exclude_properties = {
1941 c.key
1942 for c in inherited_table.c
1943 if c not in inherited_mapper._columntoproperty
1944 }.union(inherited_mapper.exclude_properties or ())
1945 exclude_properties.difference_update(
1946 [c.key for c in self.declared_columns]
1947 )
1948
1949 # look through columns in the current mapper that
1950 # are keyed to a propname different than the colname
1951 # (if names were the same, we'd have popped it out above,
1952 # in which case the mapper makes this combination).
1953 # See if the superclass has a similar column property.
1954 # If so, join them together.
1955 for k, col in list(properties.items()):
1956 if not isinstance(col, expression.ColumnElement):
1957 continue
1958 if k in inherited_mapper._props:
1959 p = inherited_mapper._props[k]
1960 if isinstance(p, ColumnProperty):
1961 # note here we place the subclass column
1962 # first. See [ticket:1892] for background.
1963 properties[k] = [col] + p.columns
1964 result_mapper_args = mapper_args.copy()
1965 result_mapper_args["properties"] = properties
1966 self.mapper_args = result_mapper_args
1967
1968 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
1969 self._prepare_mapper_arguments(mapper_kw)
1970 if hasattr(self.cls, "__mapper_cls__"):
1971 mapper_cls = cast(
1972 "Type[Mapper[Any]]",
1973 util.unbound_method_to_callable(
1974 self.cls.__mapper_cls__ # type: ignore
1975 ),
1976 )
1977 else:
1978 mapper_cls = Mapper
1979
1980 return self.set_cls_attribute(
1981 "__mapper__",
1982 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1983 )
1984
1985
1986@util.preload_module("sqlalchemy.orm.decl_api")
1987def _as_dc_declaredattr(
1988 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
1989) -> Any:
1990 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1991 # we can't write it because field.metadata is immutable :( so we have
1992 # to go through extra trouble to compare these
1993 decl_api = util.preloaded.orm_decl_api
1994 obj = field_metadata[sa_dataclass_metadata_key]
1995 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
1996 return decl_api.declared_attr(obj)
1997 else:
1998 return obj
1999
2000
2001class _DeferredMapperConfig(_ClassScanMapperConfig):
2002 _cls: weakref.ref[Type[Any]]
2003
2004 is_deferred = True
2005
2006 _configs: util.OrderedDict[
2007 weakref.ref[Type[Any]], _DeferredMapperConfig
2008 ] = util.OrderedDict()
2009
2010 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
2011 pass
2012
2013 # mypy disallows plain property override of variable
2014 @property # type: ignore
2015 def cls(self) -> Type[Any]:
2016 return self._cls() # type: ignore
2017
2018 @cls.setter
2019 def cls(self, class_: Type[Any]) -> None:
2020 self._cls = weakref.ref(class_, self._remove_config_cls)
2021 self._configs[self._cls] = self
2022
2023 @classmethod
2024 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
2025 cls._configs.pop(ref, None)
2026
2027 @classmethod
2028 def has_cls(cls, class_: Type[Any]) -> bool:
2029 # 2.6 fails on weakref if class_ is an old style class
2030 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
2031
2032 @classmethod
2033 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
2034 if hasattr(class_, "_sa_raise_deferred_config"):
2035 class_._sa_raise_deferred_config()
2036
2037 raise orm_exc.UnmappedClassError(
2038 class_,
2039 msg=(
2040 f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
2041 "mapping on it. It is not yet usable as a mapped class."
2042 ),
2043 )
2044
2045 @classmethod
2046 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
2047 return cls._configs[weakref.ref(class_)]
2048
2049 @classmethod
2050 def classes_for_base(
2051 cls, base_cls: Type[Any], sort: bool = True
2052 ) -> List[_DeferredMapperConfig]:
2053 classes_for_base = [
2054 m
2055 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
2056 if cls_ is not None and issubclass(cls_, base_cls)
2057 ]
2058
2059 if not sort:
2060 return classes_for_base
2061
2062 all_m_by_cls = {m.cls: m for m in classes_for_base}
2063
2064 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
2065 for m_cls in all_m_by_cls:
2066 tuples.extend(
2067 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
2068 for base_cls in m_cls.__bases__
2069 if base_cls in all_m_by_cls
2070 )
2071 return list(topological.sort(tuples, classes_for_base))
2072
2073 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
2074 self._configs.pop(self._cls, None)
2075 return super().map(mapper_kw)
2076
2077
2078def _add_attribute(
2079 cls: Type[Any], key: str, value: MapperProperty[Any]
2080) -> None:
2081 """add an attribute to an existing declarative class.
2082
2083 This runs through the logic to determine MapperProperty,
2084 adds it to the Mapper, adds a column to the mapped Table, etc.
2085
2086 """
2087
2088 if "__mapper__" in cls.__dict__:
2089 mapped_cls = cast("MappedClassProtocol[Any]", cls)
2090
2091 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
2092 if isinstance(mc.__table__, Table):
2093 return mc.__table__
2094 raise exc.InvalidRequestError(
2095 f"Cannot add a new attribute to mapped class {mc.__name__!r} "
2096 "because it's not mapped against a table."
2097 )
2098
2099 if isinstance(value, Column):
2100 _undefer_column_name(key, value)
2101 _table_or_raise(mapped_cls).append_column(
2102 value, replace_existing=True
2103 )
2104 mapped_cls.__mapper__.add_property(key, value)
2105 elif isinstance(value, _MapsColumns):
2106 mp = value.mapper_property_to_assign
2107 for col, _ in value.columns_to_assign:
2108 _undefer_column_name(key, col)
2109 _table_or_raise(mapped_cls).append_column(
2110 col, replace_existing=True
2111 )
2112 if not mp:
2113 mapped_cls.__mapper__.add_property(key, col)
2114 if mp:
2115 mapped_cls.__mapper__.add_property(key, mp)
2116 elif isinstance(value, MapperProperty):
2117 mapped_cls.__mapper__.add_property(key, value)
2118 elif isinstance(value, QueryableAttribute) and value.key != key:
2119 # detect a QueryableAttribute that's already mapped being
2120 # assigned elsewhere in userland, turn into a synonym()
2121 value = SynonymProperty(value.key)
2122 mapped_cls.__mapper__.add_property(key, value)
2123 else:
2124 type.__setattr__(cls, key, value)
2125 mapped_cls.__mapper__._expire_memoizations()
2126 else:
2127 type.__setattr__(cls, key, value)
2128
2129
2130def _del_attribute(cls: Type[Any], key: str) -> None:
2131 if (
2132 "__mapper__" in cls.__dict__
2133 and key in cls.__dict__
2134 and not cast(
2135 "MappedClassProtocol[Any]", cls
2136 ).__mapper__._dispose_called
2137 ):
2138 value = cls.__dict__[key]
2139 if isinstance(
2140 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
2141 ):
2142 raise NotImplementedError(
2143 "Can't un-map individual mapped attributes on a mapped class."
2144 )
2145 else:
2146 type.__delattr__(cls, key)
2147 cast(
2148 "MappedClassProtocol[Any]", cls
2149 ).__mapper__._expire_memoizations()
2150 else:
2151 type.__delattr__(cls, key)
2152
2153
2154def _declarative_constructor(self: Any, **kwargs: Any) -> None:
2155 """A simple constructor that allows initialization from kwargs.
2156
2157 Sets attributes on the constructed instance using the names and
2158 values in ``kwargs``.
2159
2160 Only keys that are present as
2161 attributes of the instance's class are allowed. These could be,
2162 for example, any mapped columns or relationships.
2163 """
2164 cls_ = type(self)
2165 for k in kwargs:
2166 if not hasattr(cls_, k):
2167 raise TypeError(
2168 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
2169 )
2170 setattr(self, k, kwargs[k])
2171
2172
2173_declarative_constructor.__name__ = "__init__"
2174
2175
2176def _undefer_column_name(key: str, column: Column[Any]) -> None:
2177 if column.key is None:
2178 column.key = key
2179 if column.name is None:
2180 column.name = key