1# orm/decl_base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Internal implementation for declarative."""
8from __future__ import absolute_import
9
10import collections
11import weakref
12
13from sqlalchemy.orm import attributes
14from sqlalchemy.orm import instrumentation
15from . import clsregistry
16from . import exc as orm_exc
17from . import mapper as mapperlib
18from .attributes import InstrumentedAttribute
19from .attributes import QueryableAttribute
20from .base import _is_mapped_class
21from .base import InspectionAttr
22from .descriptor_props import CompositeProperty
23from .descriptor_props import SynonymProperty
24from .interfaces import MapperProperty
25from .mapper import Mapper as mapper
26from .properties import ColumnProperty
27from .util import class_mapper
28from .. import event
29from .. import exc
30from .. import util
31from ..sql import expression
32from ..sql.schema import Column
33from ..sql.schema import Table
34from ..util import topological
35
36
37def _declared_mapping_info(cls):
38 # deferred mapping
39 if _DeferredMapperConfig.has_cls(cls):
40 return _DeferredMapperConfig.config_for_cls(cls)
41 # regular mapping
42 elif _is_mapped_class(cls):
43 return class_mapper(cls, configure=False)
44 else:
45 return None
46
47
48def _resolve_for_abstract_or_classical(cls):
49 if cls is object:
50 return None
51
52 if cls.__dict__.get("__abstract__", False):
53 for sup in cls.__bases__:
54 sup = _resolve_for_abstract_or_classical(sup)
55 if sup is not None:
56 return sup
57 else:
58 return None
59 else:
60 clsmanager = _dive_for_cls_manager(cls)
61
62 if clsmanager:
63 return clsmanager.class_
64 else:
65 return cls
66
67
68def _get_immediate_cls_attr(cls, attrname, strict=False):
69 """return an attribute of the class that is either present directly
70 on the class, e.g. not on a superclass, or is from a superclass but
71 this superclass is a non-mapped mixin, that is, not a descendant of
72 the declarative base and is also not classically mapped.
73
74 This is used to detect attributes that indicate something about
75 a mapped class independently from any mapped classes that it may
76 inherit from.
77
78 """
79
80 # the rules are different for this name than others,
81 # make sure we've moved it out. transitional
82 assert attrname != "__abstract__"
83
84 if not issubclass(cls, object):
85 return None
86
87 if attrname in cls.__dict__:
88 return getattr(cls, attrname)
89
90 for base in cls.__mro__[1:]:
91 _is_classicial_inherits = _dive_for_cls_manager(base)
92
93 if attrname in base.__dict__ and (
94 base is cls
95 or (
96 (base in cls.__bases__ if strict else True)
97 and not _is_classicial_inherits
98 )
99 ):
100 return getattr(base, attrname)
101 else:
102 return None
103
104
105def _dive_for_cls_manager(cls):
106 # because the class manager registration is pluggable,
107 # we need to do the search for every class in the hierarchy,
108 # rather than just a simple "cls._sa_class_manager"
109
110 # python 2 old style class
111 if not hasattr(cls, "__mro__"):
112 return None
113
114 for base in cls.__mro__:
115 manager = attributes.manager_of_class(base)
116 if manager:
117 return manager
118 return None
119
120
121def _as_declarative(registry, cls, dict_):
122
123 # declarative scans the class for attributes. no table or mapper
124 # args passed separately.
125
126 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
127
128
129def _mapper(registry, cls, table, mapper_kw):
130 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
131 return cls.__mapper__
132
133
134@util.preload_module("sqlalchemy.orm.decl_api")
135def _is_declarative_props(obj):
136 declared_attr = util.preloaded.orm_decl_api.declared_attr
137
138 return isinstance(obj, (declared_attr, util.classproperty))
139
140
141def _check_declared_props_nocascade(obj, name, cls):
142 if _is_declarative_props(obj):
143 if getattr(obj, "_cascading", False):
144 util.warn(
145 "@declared_attr.cascading is not supported on the %s "
146 "attribute on class %s. This attribute invokes for "
147 "subclasses in any case." % (name, cls)
148 )
149 return True
150 else:
151 return False
152
153
154class _MapperConfig(object):
155 __slots__ = (
156 "cls",
157 "classname",
158 "properties",
159 "declared_attr_reg",
160 "__weakref__",
161 )
162
163 @classmethod
164 def setup_mapping(cls, registry, cls_, dict_, table, mapper_kw):
165 manager = attributes.manager_of_class(cls)
166 if manager and manager.class_ is cls_:
167 raise exc.InvalidRequestError(
168 "Class %r already has been " "instrumented declaratively" % cls
169 )
170
171 if cls_.__dict__.get("__abstract__", False):
172 return
173
174 defer_map = _get_immediate_cls_attr(
175 cls_, "_sa_decl_prepare_nocascade", strict=True
176 ) or hasattr(cls_, "_sa_decl_prepare")
177
178 if defer_map:
179 cfg_cls = _DeferredMapperConfig
180 else:
181 cfg_cls = _ClassScanMapperConfig
182
183 return cfg_cls(registry, cls_, dict_, table, mapper_kw)
184
185 def __init__(self, registry, cls_, mapper_kw):
186 self.cls = util.assert_arg_type(cls_, type, "cls_")
187 self.classname = cls_.__name__
188 self.properties = util.OrderedDict()
189 self.declared_attr_reg = {}
190
191 if not mapper_kw.get("non_primary", False):
192 instrumentation.register_class(
193 self.cls,
194 finalize=False,
195 registry=registry,
196 declarative_scan=self,
197 init_method=registry.constructor,
198 )
199 else:
200 manager = attributes.manager_of_class(self.cls)
201 if not manager or not manager.is_mapped:
202 raise exc.InvalidRequestError(
203 "Class %s has no primary mapper configured. Configure "
204 "a primary mapper first before setting up a non primary "
205 "Mapper." % self.cls
206 )
207
208 def set_cls_attribute(self, attrname, value):
209
210 manager = instrumentation.manager_of_class(self.cls)
211 manager.install_member(attrname, value)
212 return value
213
214 def _early_mapping(self, mapper_kw):
215 self.map(mapper_kw)
216
217
218class _ImperativeMapperConfig(_MapperConfig):
219 __slots__ = ("dict_", "local_table", "inherits")
220
221 def __init__(
222 self,
223 registry,
224 cls_,
225 table,
226 mapper_kw,
227 ):
228 super(_ImperativeMapperConfig, self).__init__(
229 registry, cls_, mapper_kw
230 )
231
232 self.dict_ = {}
233 self.local_table = self.set_cls_attribute("__table__", table)
234
235 with mapperlib._CONFIGURE_MUTEX:
236 if not mapper_kw.get("non_primary", False):
237 clsregistry.add_class(
238 self.classname, self.cls, registry._class_registry
239 )
240
241 self._setup_inheritance(mapper_kw)
242
243 self._early_mapping(mapper_kw)
244
245 def map(self, mapper_kw=util.EMPTY_DICT):
246 mapper_cls = mapper
247
248 return self.set_cls_attribute(
249 "__mapper__",
250 mapper_cls(self.cls, self.local_table, **mapper_kw),
251 )
252
253 def _setup_inheritance(self, mapper_kw):
254 cls = self.cls
255
256 inherits = mapper_kw.get("inherits", None)
257
258 if inherits is None:
259 # since we search for classical mappings now, search for
260 # multiple mapped bases as well and raise an error.
261 inherits_search = []
262 for c in cls.__bases__:
263 c = _resolve_for_abstract_or_classical(c)
264 if c is None:
265 continue
266 if _declared_mapping_info(
267 c
268 ) is not None and not _get_immediate_cls_attr(
269 c, "_sa_decl_prepare_nocascade", strict=True
270 ):
271 inherits_search.append(c)
272
273 if inherits_search:
274 if len(inherits_search) > 1:
275 raise exc.InvalidRequestError(
276 "Class %s has multiple mapped bases: %r"
277 % (cls, inherits_search)
278 )
279 inherits = inherits_search[0]
280 elif isinstance(inherits, mapper):
281 inherits = inherits.class_
282
283 self.inherits = inherits
284
285
286class _ClassScanMapperConfig(_MapperConfig):
287 __slots__ = (
288 "dict_",
289 "local_table",
290 "persist_selectable",
291 "declared_columns",
292 "column_copies",
293 "table_args",
294 "tablename",
295 "mapper_args",
296 "mapper_args_fn",
297 "inherits",
298 )
299
300 def __init__(
301 self,
302 registry,
303 cls_,
304 dict_,
305 table,
306 mapper_kw,
307 ):
308
309 # grab class dict before the instrumentation manager has been added.
310 # reduces cycles
311 self.dict_ = dict(dict_) if dict_ else {}
312
313 super(_ClassScanMapperConfig, self).__init__(registry, cls_, mapper_kw)
314
315 self.persist_selectable = None
316 self.declared_columns = set()
317 self.column_copies = {}
318 self._setup_declared_events()
319
320 self._scan_attributes()
321
322 with mapperlib._CONFIGURE_MUTEX:
323 clsregistry.add_class(
324 self.classname, self.cls, registry._class_registry
325 )
326
327 self._extract_mappable_attributes()
328
329 self._extract_declared_columns()
330
331 self._setup_table(table)
332
333 self._setup_inheritance(mapper_kw)
334
335 self._early_mapping(mapper_kw)
336
337 def _setup_declared_events(self):
338 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
339
340 @event.listens_for(mapper, "after_configured")
341 def after_configured():
342 self.cls.__declare_last__()
343
344 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
345
346 @event.listens_for(mapper, "before_configured")
347 def before_configured():
348 self.cls.__declare_first__()
349
350 def _cls_attr_override_checker(self, cls):
351 """Produce a function that checks if a class has overridden an
352 attribute, taking SQLAlchemy-enabled dataclass fields into account.
353
354 """
355 sa_dataclass_metadata_key = _get_immediate_cls_attr(
356 cls, "__sa_dataclass_metadata_key__", None
357 )
358
359 if sa_dataclass_metadata_key is None:
360
361 def attribute_is_overridden(key, obj):
362 return getattr(cls, key) is not obj
363
364 else:
365
366 all_datacls_fields = {
367 f.name: f.metadata[sa_dataclass_metadata_key]
368 for f in util.dataclass_fields(cls)
369 if sa_dataclass_metadata_key in f.metadata
370 }
371 local_datacls_fields = {
372 f.name: f.metadata[sa_dataclass_metadata_key]
373 for f in util.local_dataclass_fields(cls)
374 if sa_dataclass_metadata_key in f.metadata
375 }
376
377 absent = object()
378
379 def attribute_is_overridden(key, obj):
380 if _is_declarative_props(obj):
381 obj = obj.fget
382
383 # this function likely has some failure modes still if
384 # someone is doing a deep mixing of the same attribute
385 # name as plain Python attribute vs. dataclass field.
386
387 ret = local_datacls_fields.get(key, absent)
388 if _is_declarative_props(ret):
389 ret = ret.fget
390
391 if ret is obj:
392 return False
393 elif ret is not absent:
394 return True
395
396 all_field = all_datacls_fields.get(key, absent)
397
398 ret = getattr(cls, key, obj)
399
400 if ret is obj:
401 return False
402
403 # for dataclasses, this could be the
404 # 'default' of the field. so filter more specifically
405 # for an already-mapped InstrumentedAttribute
406 if ret is not absent and isinstance(
407 ret, InstrumentedAttribute
408 ):
409 return True
410
411 if all_field is obj:
412 return False
413 elif all_field is not absent:
414 return True
415
416 # can't find another attribute
417 return False
418
419 return attribute_is_overridden
420
421 def _cls_attr_resolver(self, cls):
422 """produce a function to iterate the "attributes" of a class,
423 adjusting for SQLAlchemy fields embedded in dataclass fields.
424
425 """
426 sa_dataclass_metadata_key = _get_immediate_cls_attr(
427 cls, "__sa_dataclass_metadata_key__", None
428 )
429
430 if sa_dataclass_metadata_key is None:
431
432 def local_attributes_for_class():
433 for name, obj in vars(cls).items():
434 yield name, obj, False
435
436 else:
437 field_names = set()
438
439 def local_attributes_for_class():
440 for field in util.local_dataclass_fields(cls):
441 if sa_dataclass_metadata_key in field.metadata:
442 field_names.add(field.name)
443 yield field.name, _as_dc_declaredattr(
444 field.metadata, sa_dataclass_metadata_key
445 ), True
446 for name, obj in vars(cls).items():
447 if name not in field_names:
448 yield name, obj, False
449
450 return local_attributes_for_class
451
452 def _scan_attributes(self):
453 cls = self.cls
454 dict_ = self.dict_
455 column_copies = self.column_copies
456 mapper_args_fn = None
457 table_args = inherited_table_args = None
458 tablename = None
459
460 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
461
462 bases = []
463
464 for base in cls.__mro__:
465 # collect bases and make sure standalone columns are copied
466 # to be the column they will ultimately be on the class,
467 # so that declared_attr functions use the right columns.
468 # need to do this all the way up the hierarchy first
469 # (see #8190)
470
471 class_mapped = (
472 base is not cls
473 and _declared_mapping_info(base) is not None
474 and not _get_immediate_cls_attr(
475 base, "_sa_decl_prepare_nocascade", strict=True
476 )
477 )
478
479 local_attributes_for_class = self._cls_attr_resolver(base)
480
481 if not class_mapped and base is not cls:
482 locally_collected_columns = self._produce_column_copies(
483 local_attributes_for_class,
484 attribute_is_overridden,
485 )
486 else:
487 locally_collected_columns = {}
488
489 bases.append(
490 (
491 base,
492 class_mapped,
493 local_attributes_for_class,
494 locally_collected_columns,
495 )
496 )
497
498 for (
499 base,
500 class_mapped,
501 local_attributes_for_class,
502 locally_collected_columns,
503 ) in bases:
504
505 # this transfer can also take place as we scan each name
506 # for finer-grained control of how collected_attributes is
507 # populated, as this is what impacts column ordering.
508 # however it's simpler to get it out of the way here.
509 dict_.update(locally_collected_columns)
510
511 for name, obj, is_dataclass in local_attributes_for_class():
512 if name == "__mapper_args__":
513 check_decl = _check_declared_props_nocascade(
514 obj, name, cls
515 )
516 if not mapper_args_fn and (not class_mapped or check_decl):
517 # don't even invoke __mapper_args__ until
518 # after we've determined everything about the
519 # mapped table.
520 # make a copy of it so a class-level dictionary
521 # is not overwritten when we update column-based
522 # arguments.
523 def mapper_args_fn():
524 return dict(cls.__mapper_args__)
525
526 elif name == "__tablename__":
527 check_decl = _check_declared_props_nocascade(
528 obj, name, cls
529 )
530 if not tablename and (not class_mapped or check_decl):
531 tablename = cls.__tablename__
532 elif name == "__table_args__":
533 check_decl = _check_declared_props_nocascade(
534 obj, name, cls
535 )
536 if not table_args and (not class_mapped or check_decl):
537 table_args = cls.__table_args__
538 if not isinstance(
539 table_args, (tuple, dict, type(None))
540 ):
541 raise exc.ArgumentError(
542 "__table_args__ value must be a tuple, "
543 "dict, or None"
544 )
545 if base is not cls:
546 inherited_table_args = True
547 elif class_mapped:
548 if _is_declarative_props(obj):
549 util.warn(
550 "Regular (i.e. not __special__) "
551 "attribute '%s.%s' uses @declared_attr, "
552 "but owning class %s is mapped - "
553 "not applying to subclass %s."
554 % (base.__name__, name, base, cls)
555 )
556 continue
557 elif base is not cls:
558 # we're a mixin, abstract base, or something that is
559 # acting like that for now.
560 if isinstance(obj, Column):
561 # already copied columns to the mapped class.
562 continue
563 elif isinstance(obj, MapperProperty):
564 raise exc.InvalidRequestError(
565 "Mapper properties (i.e. deferred,"
566 "column_property(), relationship(), etc.) must "
567 "be declared as @declared_attr callables "
568 "on declarative mixin classes. For dataclass "
569 "field() objects, use a lambda:"
570 )
571 elif _is_declarative_props(obj):
572 if obj._cascading:
573 if name in dict_:
574 # unfortunately, while we can use the user-
575 # defined attribute here to allow a clean
576 # override, if there's another
577 # subclass below then it still tries to use
578 # this. not sure if there is enough
579 # information here to add this as a feature
580 # later on.
581 util.warn(
582 "Attribute '%s' on class %s cannot be "
583 "processed due to "
584 "@declared_attr.cascading; "
585 "skipping" % (name, cls)
586 )
587 dict_[name] = column_copies[
588 obj
589 ] = ret = obj.__get__(obj, cls)
590 setattr(cls, name, ret)
591 else:
592 if is_dataclass:
593 # access attribute using normal class access
594 # first, to see if it's been mapped on a
595 # superclass. note if the dataclasses.field()
596 # has "default", this value can be anything.
597 ret = getattr(cls, name, None)
598
599 # so, if it's anything that's not ORM
600 # mapped, assume we should invoke the
601 # declared_attr
602 if not isinstance(ret, InspectionAttr):
603 ret = obj.fget()
604 else:
605 # access attribute using normal class access.
606 # if the declared attr already took place
607 # on a superclass that is mapped, then
608 # this is no longer a declared_attr, it will
609 # be the InstrumentedAttribute
610 ret = getattr(cls, name)
611
612 # correct for proxies created from hybrid_property
613 # or similar. note there is no known case that
614 # produces nested proxies, so we are only
615 # looking one level deep right now.
616 if (
617 isinstance(ret, InspectionAttr)
618 and ret._is_internal_proxy
619 and not isinstance(
620 ret.original_property, MapperProperty
621 )
622 ):
623 ret = ret.descriptor
624
625 dict_[name] = column_copies[obj] = ret
626 if (
627 isinstance(ret, (Column, MapperProperty))
628 and ret.doc is None
629 ):
630 ret.doc = obj.__doc__
631 # here, the attribute is some other kind of property that
632 # we assume is not part of the declarative mapping.
633 # however, check for some more common mistakes
634 else:
635 self._warn_for_decl_attributes(base, name, obj)
636 elif is_dataclass and (
637 name not in dict_ or dict_[name] is not obj
638 ):
639 # here, we are definitely looking at the target class
640 # and not a superclass. this is currently a
641 # dataclass-only path. if the name is only
642 # a dataclass field and isn't in local cls.__dict__,
643 # put the object there.
644 # assert that the dataclass-enabled resolver agrees
645 # with what we are seeing
646
647 assert not attribute_is_overridden(name, obj)
648
649 if _is_declarative_props(obj):
650 obj = obj.fget()
651
652 dict_[name] = obj
653
654 if inherited_table_args and not tablename:
655 table_args = None
656
657 self.table_args = table_args
658 self.tablename = tablename
659 self.mapper_args_fn = mapper_args_fn
660
661 def _warn_for_decl_attributes(self, cls, key, c):
662 if isinstance(c, expression.ColumnClause):
663 util.warn(
664 "Attribute '%s' on class %s appears to be a non-schema "
665 "'sqlalchemy.sql.column()' "
666 "object; this won't be part of the declarative mapping"
667 % (key, cls)
668 )
669
670 def _produce_column_copies(
671 self, attributes_for_class, attribute_is_overridden
672 ):
673 cls = self.cls
674 dict_ = self.dict_
675 locally_collected_attributes = {}
676 column_copies = self.column_copies
677 # copy mixin columns to the mapped class
678
679 for name, obj, is_dataclass in attributes_for_class():
680 if isinstance(obj, Column):
681 if attribute_is_overridden(name, obj):
682 # if column has been overridden
683 # (like by the InstrumentedAttribute of the
684 # superclass), skip
685 continue
686 elif obj.foreign_keys:
687 raise exc.InvalidRequestError(
688 "Columns with foreign keys to other columns "
689 "must be declared as @declared_attr callables "
690 "on declarative mixin classes. For dataclass "
691 "field() objects, use a lambda:."
692 )
693 elif name not in dict_ and not (
694 "__table__" in dict_
695 and (obj.name or name) in dict_["__table__"].c
696 ):
697 column_copies[obj] = copy_ = obj._copy()
698 copy_._creation_order = obj._creation_order
699 setattr(cls, name, copy_)
700 locally_collected_attributes[name] = copy_
701 return locally_collected_attributes
702
703 def _extract_mappable_attributes(self):
704 cls = self.cls
705 dict_ = self.dict_
706
707 our_stuff = self.properties
708
709 late_mapped = _get_immediate_cls_attr(
710 cls, "_sa_decl_prepare_nocascade", strict=True
711 )
712
713 for k in list(dict_):
714
715 if k in ("__table__", "__tablename__", "__mapper_args__"):
716 continue
717
718 value = dict_[k]
719 if _is_declarative_props(value):
720 if value._cascading:
721 util.warn(
722 "Use of @declared_attr.cascading only applies to "
723 "Declarative 'mixin' and 'abstract' classes. "
724 "Currently, this flag is ignored on mapped class "
725 "%s" % self.cls
726 )
727
728 value = getattr(cls, k)
729
730 elif (
731 isinstance(value, QueryableAttribute)
732 and value.class_ is not cls
733 and value.key != k
734 ):
735 # detect a QueryableAttribute that's already mapped being
736 # assigned elsewhere in userland, turn into a synonym()
737 value = SynonymProperty(value.key)
738 setattr(cls, k, value)
739
740 if (
741 isinstance(value, tuple)
742 and len(value) == 1
743 and isinstance(value[0], (Column, MapperProperty))
744 ):
745 util.warn(
746 "Ignoring declarative-like tuple value of attribute "
747 "'%s': possibly a copy-and-paste error with a comma "
748 "accidentally placed at the end of the line?" % k
749 )
750 continue
751 elif not isinstance(value, (Column, MapperProperty)):
752 # using @declared_attr for some object that
753 # isn't Column/MapperProperty; remove from the dict_
754 # and place the evaluated value onto the class.
755 if not k.startswith("__"):
756 dict_.pop(k)
757 self._warn_for_decl_attributes(cls, k, value)
758 if not late_mapped:
759 setattr(cls, k, value)
760 continue
761 # we expect to see the name 'metadata' in some valid cases;
762 # however at this point we see it's assigned to something trying
763 # to be mapped, so raise for that.
764 elif k == "metadata":
765 raise exc.InvalidRequestError(
766 "Attribute name 'metadata' is reserved "
767 "for the MetaData instance when using a "
768 "declarative base class."
769 )
770 our_stuff[k] = value
771
772 def _extract_declared_columns(self):
773 our_stuff = self.properties
774
775 # set up attributes in the order they were created
776 util.sort_dictionary(
777 our_stuff, key=lambda key: our_stuff[key]._creation_order
778 )
779
780 # extract columns from the class dict
781 declared_columns = self.declared_columns
782 name_to_prop_key = collections.defaultdict(set)
783 for key, c in list(our_stuff.items()):
784 if isinstance(c, (ColumnProperty, CompositeProperty)):
785 for col in c.columns:
786 if isinstance(col, Column) and col.table is None:
787 _undefer_column_name(key, col)
788 if not isinstance(c, CompositeProperty):
789 name_to_prop_key[col.name].add(key)
790 declared_columns.add(col)
791 elif isinstance(c, Column):
792 _undefer_column_name(key, c)
793 name_to_prop_key[c.name].add(key)
794 declared_columns.add(c)
795 # if the column is the same name as the key,
796 # remove it from the explicit properties dict.
797 # the normal rules for assigning column-based properties
798 # will take over, including precedence of columns
799 # in multi-column ColumnProperties.
800 if key == c.key:
801 del our_stuff[key]
802
803 for name, keys in name_to_prop_key.items():
804 if len(keys) > 1:
805 util.warn(
806 "On class %r, Column object %r named "
807 "directly multiple times, "
808 "only one will be used: %s. "
809 "Consider using orm.synonym instead"
810 % (self.classname, name, (", ".join(sorted(keys))))
811 )
812
813 def _setup_table(self, table=None):
814 cls = self.cls
815 tablename = self.tablename
816 table_args = self.table_args
817 dict_ = self.dict_
818 declared_columns = self.declared_columns
819
820 manager = attributes.manager_of_class(cls)
821
822 declared_columns = self.declared_columns = sorted(
823 declared_columns, key=lambda c: c._creation_order
824 )
825
826 if "__table__" not in dict_ and table is None:
827 if hasattr(cls, "__table_cls__"):
828 table_cls = util.unbound_method_to_callable(cls.__table_cls__)
829 else:
830 table_cls = Table
831
832 if tablename is not None:
833
834 args, table_kw = (), {}
835 if table_args:
836 if isinstance(table_args, dict):
837 table_kw = table_args
838 elif isinstance(table_args, tuple):
839 if isinstance(table_args[-1], dict):
840 args, table_kw = table_args[0:-1], table_args[-1]
841 else:
842 args = table_args
843
844 autoload_with = dict_.get("__autoload_with__")
845 if autoload_with:
846 table_kw["autoload_with"] = autoload_with
847
848 autoload = dict_.get("__autoload__")
849 if autoload:
850 table_kw["autoload"] = True
851
852 table = self.set_cls_attribute(
853 "__table__",
854 table_cls(
855 tablename,
856 self._metadata_for_cls(manager),
857 *(tuple(declared_columns) + tuple(args)),
858 **table_kw
859 ),
860 )
861 else:
862 if table is None:
863 table = cls.__table__
864 if declared_columns:
865 for c in declared_columns:
866 if not table.c.contains_column(c):
867 raise exc.ArgumentError(
868 "Can't add additional column %r when "
869 "specifying __table__" % c.key
870 )
871 self.local_table = table
872
873 def _metadata_for_cls(self, manager):
874 if hasattr(self.cls, "metadata"):
875 return self.cls.metadata
876 else:
877 return manager.registry.metadata
878
879 def _setup_inheritance(self, mapper_kw):
880 table = self.local_table
881 cls = self.cls
882 table_args = self.table_args
883 declared_columns = self.declared_columns
884
885 inherits = mapper_kw.get("inherits", None)
886
887 if inherits is None:
888 # since we search for classical mappings now, search for
889 # multiple mapped bases as well and raise an error.
890 inherits_search = []
891 for c in cls.__bases__:
892 c = _resolve_for_abstract_or_classical(c)
893 if c is None:
894 continue
895 if _declared_mapping_info(
896 c
897 ) is not None and not _get_immediate_cls_attr(
898 c, "_sa_decl_prepare_nocascade", strict=True
899 ):
900 if c not in inherits_search:
901 inherits_search.append(c)
902
903 if inherits_search:
904 if len(inherits_search) > 1:
905 raise exc.InvalidRequestError(
906 "Class %s has multiple mapped bases: %r"
907 % (cls, inherits_search)
908 )
909 inherits = inherits_search[0]
910 elif isinstance(inherits, mapper):
911 inherits = inherits.class_
912
913 self.inherits = inherits
914
915 if (
916 table is None
917 and self.inherits is None
918 and not _get_immediate_cls_attr(cls, "__no_table__")
919 ):
920
921 raise exc.InvalidRequestError(
922 "Class %r does not have a __table__ or __tablename__ "
923 "specified and does not inherit from an existing "
924 "table-mapped class." % cls
925 )
926 elif self.inherits:
927 inherited_mapper = _declared_mapping_info(self.inherits)
928 inherited_table = inherited_mapper.local_table
929 inherited_persist_selectable = inherited_mapper.persist_selectable
930
931 if table is None:
932 # single table inheritance.
933 # ensure no table args
934 if table_args:
935 raise exc.ArgumentError(
936 "Can't place __table_args__ on an inherited class "
937 "with no table."
938 )
939 # add any columns declared here to the inherited table.
940 for c in declared_columns:
941 if c.name in inherited_table.c:
942 if inherited_table.c[c.name] is c:
943 continue
944 raise exc.ArgumentError(
945 "Column '%s' on class %s conflicts with "
946 "existing column '%s'"
947 % (c, cls, inherited_table.c[c.name])
948 )
949 if c.primary_key:
950 raise exc.ArgumentError(
951 "Can't place primary key columns on an inherited "
952 "class with no table."
953 )
954 inherited_table.append_column(c)
955 if (
956 inherited_persist_selectable is not None
957 and inherited_persist_selectable is not inherited_table
958 ):
959 inherited_persist_selectable._refresh_for_new_column(c)
960
961 def _prepare_mapper_arguments(self, mapper_kw):
962 properties = self.properties
963
964 if self.mapper_args_fn:
965 mapper_args = self.mapper_args_fn()
966 else:
967 mapper_args = {}
968
969 if mapper_kw:
970 mapper_args.update(mapper_kw)
971
972 if "properties" in mapper_args:
973 properties = dict(properties)
974 properties.update(mapper_args["properties"])
975
976 # make sure that column copies are used rather
977 # than the original columns from any mixins
978 for k in ("version_id_col", "polymorphic_on"):
979 if k in mapper_args:
980 v = mapper_args[k]
981 mapper_args[k] = self.column_copies.get(v, v)
982
983 if "inherits" in mapper_args:
984 inherits_arg = mapper_args["inherits"]
985 if isinstance(inherits_arg, mapper):
986 inherits_arg = inherits_arg.class_
987
988 if inherits_arg is not self.inherits:
989 raise exc.InvalidRequestError(
990 "mapper inherits argument given for non-inheriting "
991 "class %s" % (mapper_args["inherits"])
992 )
993
994 if self.inherits:
995 mapper_args["inherits"] = self.inherits
996
997 if self.inherits and not mapper_args.get("concrete", False):
998 # single or joined inheritance
999 # exclude any cols on the inherited table which are
1000 # not mapped on the parent class, to avoid
1001 # mapping columns specific to sibling/nephew classes
1002 inherited_mapper = _declared_mapping_info(self.inherits)
1003 inherited_table = inherited_mapper.local_table
1004
1005 if "exclude_properties" not in mapper_args:
1006 mapper_args["exclude_properties"] = exclude_properties = set(
1007 [
1008 c.key
1009 for c in inherited_table.c
1010 if c not in inherited_mapper._columntoproperty
1011 ]
1012 ).union(inherited_mapper.exclude_properties or ())
1013 exclude_properties.difference_update(
1014 [c.key for c in self.declared_columns]
1015 )
1016
1017 # look through columns in the current mapper that
1018 # are keyed to a propname different than the colname
1019 # (if names were the same, we'd have popped it out above,
1020 # in which case the mapper makes this combination).
1021 # See if the superclass has a similar column property.
1022 # If so, join them together.
1023 for k, col in list(properties.items()):
1024 if not isinstance(col, expression.ColumnElement):
1025 continue
1026 if k in inherited_mapper._props:
1027 p = inherited_mapper._props[k]
1028 if isinstance(p, ColumnProperty):
1029 # note here we place the subclass column
1030 # first. See [ticket:1892] for background.
1031 properties[k] = [col] + p.columns
1032 result_mapper_args = mapper_args.copy()
1033 result_mapper_args["properties"] = properties
1034 self.mapper_args = result_mapper_args
1035
1036 def map(self, mapper_kw=util.EMPTY_DICT):
1037 self._prepare_mapper_arguments(mapper_kw)
1038 if hasattr(self.cls, "__mapper_cls__"):
1039 mapper_cls = util.unbound_method_to_callable(
1040 self.cls.__mapper_cls__
1041 )
1042 else:
1043 mapper_cls = mapper
1044
1045 return self.set_cls_attribute(
1046 "__mapper__",
1047 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1048 )
1049
1050
1051@util.preload_module("sqlalchemy.orm.decl_api")
1052def _as_dc_declaredattr(field_metadata, sa_dataclass_metadata_key):
1053 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1054 # we can't write it because field.metadata is immutable :( so we have
1055 # to go through extra trouble to compare these
1056 decl_api = util.preloaded.orm_decl_api
1057 obj = field_metadata[sa_dataclass_metadata_key]
1058 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
1059 return decl_api.declared_attr(obj)
1060 else:
1061 return obj
1062
1063
1064class _DeferredMapperConfig(_ClassScanMapperConfig):
1065 _configs = util.OrderedDict()
1066
1067 def _early_mapping(self, mapper_kw):
1068 pass
1069
1070 @property
1071 def cls(self):
1072 return self._cls()
1073
1074 @cls.setter
1075 def cls(self, class_):
1076 self._cls = weakref.ref(class_, self._remove_config_cls)
1077 self._configs[self._cls] = self
1078
1079 @classmethod
1080 def _remove_config_cls(cls, ref):
1081 cls._configs.pop(ref, None)
1082
1083 @classmethod
1084 def has_cls(cls, class_):
1085 # 2.6 fails on weakref if class_ is an old style class
1086 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
1087
1088 @classmethod
1089 def raise_unmapped_for_cls(cls, class_):
1090 if hasattr(class_, "_sa_raise_deferred_config"):
1091 class_._sa_raise_deferred_config()
1092
1093 raise orm_exc.UnmappedClassError(
1094 class_,
1095 msg="Class %s has a deferred mapping on it. It is not yet "
1096 "usable as a mapped class." % orm_exc._safe_cls_name(class_),
1097 )
1098
1099 @classmethod
1100 def config_for_cls(cls, class_):
1101 return cls._configs[weakref.ref(class_)]
1102
1103 @classmethod
1104 def classes_for_base(cls, base_cls, sort=True):
1105 classes_for_base = [
1106 m
1107 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
1108 if cls_ is not None and issubclass(cls_, base_cls)
1109 ]
1110
1111 if not sort:
1112 return classes_for_base
1113
1114 all_m_by_cls = dict((m.cls, m) for m in classes_for_base)
1115
1116 tuples = []
1117 for m_cls in all_m_by_cls:
1118 tuples.extend(
1119 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
1120 for base_cls in m_cls.__bases__
1121 if base_cls in all_m_by_cls
1122 )
1123 return list(topological.sort(tuples, classes_for_base))
1124
1125 def map(self, mapper_kw=util.EMPTY_DICT):
1126 self._configs.pop(self._cls, None)
1127 return super(_DeferredMapperConfig, self).map(mapper_kw)
1128
1129
1130def _add_attribute(cls, key, value):
1131 """add an attribute to an existing declarative class.
1132
1133 This runs through the logic to determine MapperProperty,
1134 adds it to the Mapper, adds a column to the mapped Table, etc.
1135
1136 """
1137
1138 if "__mapper__" in cls.__dict__:
1139 if isinstance(value, Column):
1140 _undefer_column_name(key, value)
1141 cls.__table__.append_column(value, replace_existing=True)
1142 cls.__mapper__.add_property(key, value)
1143 elif isinstance(value, ColumnProperty):
1144 for col in value.columns:
1145 if isinstance(col, Column) and col.table is None:
1146 _undefer_column_name(key, col)
1147 cls.__table__.append_column(col, replace_existing=True)
1148 cls.__mapper__.add_property(key, value)
1149 elif isinstance(value, MapperProperty):
1150 cls.__mapper__.add_property(key, value)
1151 elif isinstance(value, QueryableAttribute) and value.key != key:
1152 # detect a QueryableAttribute that's already mapped being
1153 # assigned elsewhere in userland, turn into a synonym()
1154 value = SynonymProperty(value.key)
1155 cls.__mapper__.add_property(key, value)
1156 else:
1157 type.__setattr__(cls, key, value)
1158 cls.__mapper__._expire_memoizations()
1159 else:
1160 type.__setattr__(cls, key, value)
1161
1162
1163def _del_attribute(cls, key):
1164
1165 if (
1166 "__mapper__" in cls.__dict__
1167 and key in cls.__dict__
1168 and not cls.__mapper__._dispose_called
1169 ):
1170 value = cls.__dict__[key]
1171 if isinstance(
1172 value, (Column, ColumnProperty, MapperProperty, QueryableAttribute)
1173 ):
1174 raise NotImplementedError(
1175 "Can't un-map individual mapped attributes on a mapped class."
1176 )
1177 else:
1178 type.__delattr__(cls, key)
1179 cls.__mapper__._expire_memoizations()
1180 else:
1181 type.__delattr__(cls, key)
1182
1183
1184def _declarative_constructor(self, **kwargs):
1185 """A simple constructor that allows initialization from kwargs.
1186
1187 Sets attributes on the constructed instance using the names and
1188 values in ``kwargs``.
1189
1190 Only keys that are present as
1191 attributes of the instance's class are allowed. These could be,
1192 for example, any mapped columns or relationships.
1193 """
1194 cls_ = type(self)
1195 for k in kwargs:
1196 if not hasattr(cls_, k):
1197 raise TypeError(
1198 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
1199 )
1200 setattr(self, k, kwargs[k])
1201
1202
1203_declarative_constructor.__name__ = "__init__"
1204
1205
1206def _undefer_column_name(key, column):
1207 if column.key is None:
1208 column.key = key
1209 if column.name is None:
1210 column.name = key