Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py: 55%
565 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# ext/declarative/base.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Internal implementation for declarative."""
8from __future__ import absolute_import
10import collections
11import weakref
13from sqlalchemy.orm import attributes
14from sqlalchemy.orm import instrumentation
15from . import clsregistry
16from . import exc as orm_exc
17from . import mapper as mapperlib
18from .attributes import InstrumentedAttribute
19from .attributes import QueryableAttribute
20from .base import _is_mapped_class
21from .base import InspectionAttr
22from .descriptor_props import CompositeProperty
23from .descriptor_props import SynonymProperty
24from .interfaces import MapperProperty
25from .mapper import Mapper as mapper
26from .properties import ColumnProperty
27from .util import class_mapper
28from .. import event
29from .. import exc
30from .. import util
31from ..sql import expression
32from ..sql.schema import Column
33from ..sql.schema import Table
34from ..util import topological
37def _declared_mapping_info(cls):
38 # deferred mapping
39 if _DeferredMapperConfig.has_cls(cls):
40 return _DeferredMapperConfig.config_for_cls(cls)
41 # regular mapping
42 elif _is_mapped_class(cls):
43 return class_mapper(cls, configure=False)
44 else:
45 return None
48def _resolve_for_abstract_or_classical(cls):
49 if cls is object:
50 return None
52 if cls.__dict__.get("__abstract__", False):
53 for sup in cls.__bases__:
54 sup = _resolve_for_abstract_or_classical(sup)
55 if sup is not None:
56 return sup
57 else:
58 return None
59 else:
60 clsmanager = _dive_for_cls_manager(cls)
62 if clsmanager:
63 return clsmanager.class_
64 else:
65 return cls
68def _get_immediate_cls_attr(cls, attrname, strict=False):
69 """return an attribute of the class that is either present directly
70 on the class, e.g. not on a superclass, or is from a superclass but
71 this superclass is a non-mapped mixin, that is, not a descendant of
72 the declarative base and is also not classically mapped.
74 This is used to detect attributes that indicate something about
75 a mapped class independently from any mapped classes that it may
76 inherit from.
78 """
80 # the rules are different for this name than others,
81 # make sure we've moved it out. transitional
82 assert attrname != "__abstract__"
84 if not issubclass(cls, object):
85 return None
87 if attrname in cls.__dict__:
88 return getattr(cls, attrname)
90 for base in cls.__mro__[1:]:
91 _is_classicial_inherits = _dive_for_cls_manager(base)
93 if attrname in base.__dict__ and (
94 base is cls
95 or (
96 (base in cls.__bases__ if strict else True)
97 and not _is_classicial_inherits
98 )
99 ):
100 return getattr(base, attrname)
101 else:
102 return None
105def _dive_for_cls_manager(cls):
106 # because the class manager registration is pluggable,
107 # we need to do the search for every class in the hierarchy,
108 # rather than just a simple "cls._sa_class_manager"
110 # python 2 old style class
111 if not hasattr(cls, "__mro__"):
112 return None
114 for base in cls.__mro__:
115 manager = attributes.manager_of_class(base)
116 if manager:
117 return manager
118 return None
121def _as_declarative(registry, cls, dict_):
123 # declarative scans the class for attributes. no table or mapper
124 # args passed separately.
126 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
129def _mapper(registry, cls, table, mapper_kw):
130 _ImperativeMapperConfig(registry, cls, table, mapper_kw)
131 return cls.__mapper__
134@util.preload_module("sqlalchemy.orm.decl_api")
135def _is_declarative_props(obj):
136 declared_attr = util.preloaded.orm_decl_api.declared_attr
138 return isinstance(obj, (declared_attr, util.classproperty))
141def _check_declared_props_nocascade(obj, name, cls):
142 if _is_declarative_props(obj):
143 if getattr(obj, "_cascading", False):
144 util.warn(
145 "@declared_attr.cascading is not supported on the %s "
146 "attribute on class %s. This attribute invokes for "
147 "subclasses in any case." % (name, cls)
148 )
149 return True
150 else:
151 return False
154class _MapperConfig(object):
155 __slots__ = (
156 "cls",
157 "classname",
158 "properties",
159 "declared_attr_reg",
160 "__weakref__",
161 )
163 @classmethod
164 def setup_mapping(cls, registry, cls_, dict_, table, mapper_kw):
165 manager = attributes.manager_of_class(cls)
166 if manager and manager.class_ is cls_:
167 raise exc.InvalidRequestError(
168 "Class %r already has been " "instrumented declaratively" % cls
169 )
171 if cls_.__dict__.get("__abstract__", False):
172 return
174 defer_map = _get_immediate_cls_attr(
175 cls_, "_sa_decl_prepare_nocascade", strict=True
176 ) or hasattr(cls_, "_sa_decl_prepare")
178 if defer_map:
179 cfg_cls = _DeferredMapperConfig
180 else:
181 cfg_cls = _ClassScanMapperConfig
183 return cfg_cls(registry, cls_, dict_, table, mapper_kw)
185 def __init__(self, registry, cls_, mapper_kw):
186 self.cls = util.assert_arg_type(cls_, type, "cls_")
187 self.classname = cls_.__name__
188 self.properties = util.OrderedDict()
189 self.declared_attr_reg = {}
191 if not mapper_kw.get("non_primary", False):
192 instrumentation.register_class(
193 self.cls,
194 finalize=False,
195 registry=registry,
196 declarative_scan=self,
197 init_method=registry.constructor,
198 )
199 else:
200 manager = attributes.manager_of_class(self.cls)
201 if not manager or not manager.is_mapped:
202 raise exc.InvalidRequestError(
203 "Class %s has no primary mapper configured. Configure "
204 "a primary mapper first before setting up a non primary "
205 "Mapper." % self.cls
206 )
208 def set_cls_attribute(self, attrname, value):
210 manager = instrumentation.manager_of_class(self.cls)
211 manager.install_member(attrname, value)
212 return value
214 def _early_mapping(self, mapper_kw):
215 self.map(mapper_kw)
218class _ImperativeMapperConfig(_MapperConfig):
219 __slots__ = ("dict_", "local_table", "inherits")
221 def __init__(
222 self,
223 registry,
224 cls_,
225 table,
226 mapper_kw,
227 ):
228 super(_ImperativeMapperConfig, self).__init__(
229 registry, cls_, mapper_kw
230 )
232 self.dict_ = {}
233 self.local_table = self.set_cls_attribute("__table__", table)
235 with mapperlib._CONFIGURE_MUTEX:
236 if not mapper_kw.get("non_primary", False):
237 clsregistry.add_class(
238 self.classname, self.cls, registry._class_registry
239 )
241 self._setup_inheritance(mapper_kw)
243 self._early_mapping(mapper_kw)
245 def map(self, mapper_kw=util.EMPTY_DICT):
246 mapper_cls = mapper
248 return self.set_cls_attribute(
249 "__mapper__",
250 mapper_cls(self.cls, self.local_table, **mapper_kw),
251 )
253 def _setup_inheritance(self, mapper_kw):
254 cls = self.cls
256 inherits = mapper_kw.get("inherits", None)
258 if inherits is None:
259 # since we search for classical mappings now, search for
260 # multiple mapped bases as well and raise an error.
261 inherits_search = []
262 for c in cls.__bases__:
263 c = _resolve_for_abstract_or_classical(c)
264 if c is None:
265 continue
266 if _declared_mapping_info(
267 c
268 ) is not None and not _get_immediate_cls_attr(
269 c, "_sa_decl_prepare_nocascade", strict=True
270 ):
271 inherits_search.append(c)
273 if inherits_search:
274 if len(inherits_search) > 1:
275 raise exc.InvalidRequestError(
276 "Class %s has multiple mapped bases: %r"
277 % (cls, inherits_search)
278 )
279 inherits = inherits_search[0]
280 elif isinstance(inherits, mapper):
281 inherits = inherits.class_
283 self.inherits = inherits
286class _ClassScanMapperConfig(_MapperConfig):
287 __slots__ = (
288 "dict_",
289 "local_table",
290 "persist_selectable",
291 "declared_columns",
292 "column_copies",
293 "table_args",
294 "tablename",
295 "mapper_args",
296 "mapper_args_fn",
297 "inherits",
298 )
300 def __init__(
301 self,
302 registry,
303 cls_,
304 dict_,
305 table,
306 mapper_kw,
307 ):
309 # grab class dict before the instrumentation manager has been added.
310 # reduces cycles
311 self.dict_ = dict(dict_) if dict_ else {}
313 super(_ClassScanMapperConfig, self).__init__(registry, cls_, mapper_kw)
315 self.persist_selectable = None
316 self.declared_columns = set()
317 self.column_copies = {}
318 self._setup_declared_events()
320 self._scan_attributes()
322 with mapperlib._CONFIGURE_MUTEX:
323 clsregistry.add_class(
324 self.classname, self.cls, registry._class_registry
325 )
327 self._extract_mappable_attributes()
329 self._extract_declared_columns()
331 self._setup_table(table)
333 self._setup_inheritance(mapper_kw)
335 self._early_mapping(mapper_kw)
337 def _setup_declared_events(self):
338 if _get_immediate_cls_attr(self.cls, "__declare_last__"):
340 @event.listens_for(mapper, "after_configured")
341 def after_configured():
342 self.cls.__declare_last__()
344 if _get_immediate_cls_attr(self.cls, "__declare_first__"):
346 @event.listens_for(mapper, "before_configured")
347 def before_configured():
348 self.cls.__declare_first__()
350 def _cls_attr_override_checker(self, cls):
351 """Produce a function that checks if a class has overridden an
352 attribute, taking SQLAlchemy-enabled dataclass fields into account.
354 """
355 sa_dataclass_metadata_key = _get_immediate_cls_attr(
356 cls, "__sa_dataclass_metadata_key__", None
357 )
359 if sa_dataclass_metadata_key is None:
361 def attribute_is_overridden(key, obj):
362 return getattr(cls, key) is not obj
364 else:
366 all_datacls_fields = {
367 f.name: f.metadata[sa_dataclass_metadata_key]
368 for f in util.dataclass_fields(cls)
369 if sa_dataclass_metadata_key in f.metadata
370 }
371 local_datacls_fields = {
372 f.name: f.metadata[sa_dataclass_metadata_key]
373 for f in util.local_dataclass_fields(cls)
374 if sa_dataclass_metadata_key in f.metadata
375 }
377 absent = object()
379 def attribute_is_overridden(key, obj):
380 if _is_declarative_props(obj):
381 obj = obj.fget
383 # this function likely has some failure modes still if
384 # someone is doing a deep mixing of the same attribute
385 # name as plain Python attribute vs. dataclass field.
387 ret = local_datacls_fields.get(key, absent)
388 if _is_declarative_props(ret):
389 ret = ret.fget
391 if ret is obj:
392 return False
393 elif ret is not absent:
394 return True
396 all_field = all_datacls_fields.get(key, absent)
398 ret = getattr(cls, key, obj)
400 if ret is obj:
401 return False
403 # for dataclasses, this could be the
404 # 'default' of the field. so filter more specifically
405 # for an already-mapped InstrumentedAttribute
406 if ret is not absent and isinstance(
407 ret, InstrumentedAttribute
408 ):
409 return True
411 if all_field is obj:
412 return False
413 elif all_field is not absent:
414 return True
416 # can't find another attribute
417 return False
419 return attribute_is_overridden
421 def _cls_attr_resolver(self, cls):
422 """produce a function to iterate the "attributes" of a class,
423 adjusting for SQLAlchemy fields embedded in dataclass fields.
425 """
426 sa_dataclass_metadata_key = _get_immediate_cls_attr(
427 cls, "__sa_dataclass_metadata_key__", None
428 )
430 if sa_dataclass_metadata_key is None:
432 def local_attributes_for_class():
433 for name, obj in vars(cls).items():
434 yield name, obj, False
436 else:
437 field_names = set()
439 def local_attributes_for_class():
440 for field in util.local_dataclass_fields(cls):
441 if sa_dataclass_metadata_key in field.metadata:
442 field_names.add(field.name)
443 yield field.name, _as_dc_declaredattr(
444 field.metadata, sa_dataclass_metadata_key
445 ), True
446 for name, obj in vars(cls).items():
447 if name not in field_names:
448 yield name, obj, False
450 return local_attributes_for_class
452 def _scan_attributes(self):
453 cls = self.cls
454 dict_ = self.dict_
455 column_copies = self.column_copies
456 mapper_args_fn = None
457 table_args = inherited_table_args = None
458 tablename = None
460 attribute_is_overridden = self._cls_attr_override_checker(self.cls)
462 bases = []
464 for base in cls.__mro__:
465 # collect bases and make sure standalone columns are copied
466 # to be the column they will ultimately be on the class,
467 # so that declared_attr functions use the right columns.
468 # need to do this all the way up the hierarchy first
469 # (see #8190)
471 class_mapped = (
472 base is not cls
473 and _declared_mapping_info(base) is not None
474 and not _get_immediate_cls_attr(
475 base, "_sa_decl_prepare_nocascade", strict=True
476 )
477 )
479 local_attributes_for_class = self._cls_attr_resolver(base)
481 if not class_mapped and base is not cls:
482 locally_collected_columns = self._produce_column_copies(
483 local_attributes_for_class,
484 attribute_is_overridden,
485 )
486 else:
487 locally_collected_columns = {}
489 bases.append(
490 (
491 base,
492 class_mapped,
493 local_attributes_for_class,
494 locally_collected_columns,
495 )
496 )
498 for (
499 base,
500 class_mapped,
501 local_attributes_for_class,
502 locally_collected_columns,
503 ) in bases:
505 # this transfer can also take place as we scan each name
506 # for finer-grained control of how collected_attributes is
507 # populated, as this is what impacts column ordering.
508 # however it's simpler to get it out of the way here.
509 dict_.update(locally_collected_columns)
511 for name, obj, is_dataclass in local_attributes_for_class():
512 if name == "__mapper_args__":
513 check_decl = _check_declared_props_nocascade(
514 obj, name, cls
515 )
516 if not mapper_args_fn and (not class_mapped or check_decl):
517 # don't even invoke __mapper_args__ until
518 # after we've determined everything about the
519 # mapped table.
520 # make a copy of it so a class-level dictionary
521 # is not overwritten when we update column-based
522 # arguments.
523 def mapper_args_fn():
524 return dict(cls.__mapper_args__)
526 elif name == "__tablename__":
527 check_decl = _check_declared_props_nocascade(
528 obj, name, cls
529 )
530 if not tablename and (not class_mapped or check_decl):
531 tablename = cls.__tablename__
532 elif name == "__table_args__":
533 check_decl = _check_declared_props_nocascade(
534 obj, name, cls
535 )
536 if not table_args and (not class_mapped or check_decl):
537 table_args = cls.__table_args__
538 if not isinstance(
539 table_args, (tuple, dict, type(None))
540 ):
541 raise exc.ArgumentError(
542 "__table_args__ value must be a tuple, "
543 "dict, or None"
544 )
545 if base is not cls:
546 inherited_table_args = True
547 elif class_mapped:
548 if _is_declarative_props(obj):
549 util.warn(
550 "Regular (i.e. not __special__) "
551 "attribute '%s.%s' uses @declared_attr, "
552 "but owning class %s is mapped - "
553 "not applying to subclass %s."
554 % (base.__name__, name, base, cls)
555 )
556 continue
557 elif base is not cls:
558 # we're a mixin, abstract base, or something that is
559 # acting like that for now.
560 if isinstance(obj, Column):
561 # already copied columns to the mapped class.
562 continue
563 elif isinstance(obj, MapperProperty):
564 raise exc.InvalidRequestError(
565 "Mapper properties (i.e. deferred,"
566 "column_property(), relationship(), etc.) must "
567 "be declared as @declared_attr callables "
568 "on declarative mixin classes. For dataclass "
569 "field() objects, use a lambda:"
570 )
571 elif _is_declarative_props(obj):
572 if obj._cascading:
573 if name in dict_:
574 # unfortunately, while we can use the user-
575 # defined attribute here to allow a clean
576 # override, if there's another
577 # subclass below then it still tries to use
578 # this. not sure if there is enough
579 # information here to add this as a feature
580 # later on.
581 util.warn(
582 "Attribute '%s' on class %s cannot be "
583 "processed due to "
584 "@declared_attr.cascading; "
585 "skipping" % (name, cls)
586 )
587 dict_[name] = column_copies[
588 obj
589 ] = ret = obj.__get__(obj, cls)
590 setattr(cls, name, ret)
591 else:
592 if is_dataclass:
593 # access attribute using normal class access
594 # first, to see if it's been mapped on a
595 # superclass. note if the dataclasses.field()
596 # has "default", this value can be anything.
597 ret = getattr(cls, name, None)
599 # so, if it's anything that's not ORM
600 # mapped, assume we should invoke the
601 # declared_attr
602 if not isinstance(ret, InspectionAttr):
603 ret = obj.fget()
604 else:
605 # access attribute using normal class access.
606 # if the declared attr already took place
607 # on a superclass that is mapped, then
608 # this is no longer a declared_attr, it will
609 # be the InstrumentedAttribute
610 ret = getattr(cls, name)
612 # correct for proxies created from hybrid_property
613 # or similar. note there is no known case that
614 # produces nested proxies, so we are only
615 # looking one level deep right now.
616 if (
617 isinstance(ret, InspectionAttr)
618 and ret._is_internal_proxy
619 and not isinstance(
620 ret.original_property, MapperProperty
621 )
622 ):
623 ret = ret.descriptor
625 dict_[name] = column_copies[obj] = ret
626 if (
627 isinstance(ret, (Column, MapperProperty))
628 and ret.doc is None
629 ):
630 ret.doc = obj.__doc__
631 # here, the attribute is some other kind of property that
632 # we assume is not part of the declarative mapping.
633 # however, check for some more common mistakes
634 else:
635 self._warn_for_decl_attributes(base, name, obj)
636 elif is_dataclass and (
637 name not in dict_ or dict_[name] is not obj
638 ):
639 # here, we are definitely looking at the target class
640 # and not a superclass. this is currently a
641 # dataclass-only path. if the name is only
642 # a dataclass field and isn't in local cls.__dict__,
643 # put the object there.
644 # assert that the dataclass-enabled resolver agrees
645 # with what we are seeing
647 assert not attribute_is_overridden(name, obj)
649 if _is_declarative_props(obj):
650 obj = obj.fget()
652 dict_[name] = obj
654 if inherited_table_args and not tablename:
655 table_args = None
657 self.table_args = table_args
658 self.tablename = tablename
659 self.mapper_args_fn = mapper_args_fn
661 def _warn_for_decl_attributes(self, cls, key, c):
662 if isinstance(c, expression.ColumnClause):
663 util.warn(
664 "Attribute '%s' on class %s appears to be a non-schema "
665 "'sqlalchemy.sql.column()' "
666 "object; this won't be part of the declarative mapping"
667 % (key, cls)
668 )
670 def _produce_column_copies(
671 self, attributes_for_class, attribute_is_overridden
672 ):
673 cls = self.cls
674 dict_ = self.dict_
675 locally_collected_attributes = {}
676 column_copies = self.column_copies
677 # copy mixin columns to the mapped class
679 for name, obj, is_dataclass in attributes_for_class():
680 if isinstance(obj, Column):
681 if attribute_is_overridden(name, obj):
682 # if column has been overridden
683 # (like by the InstrumentedAttribute of the
684 # superclass), skip
685 continue
686 elif obj.foreign_keys:
687 raise exc.InvalidRequestError(
688 "Columns with foreign keys to other columns "
689 "must be declared as @declared_attr callables "
690 "on declarative mixin classes. For dataclass "
691 "field() objects, use a lambda:."
692 )
693 elif name not in dict_ and not (
694 "__table__" in dict_
695 and (obj.name or name) in dict_["__table__"].c
696 ):
697 column_copies[obj] = copy_ = obj._copy()
698 copy_._creation_order = obj._creation_order
699 setattr(cls, name, copy_)
700 locally_collected_attributes[name] = copy_
701 return locally_collected_attributes
703 def _extract_mappable_attributes(self):
704 cls = self.cls
705 dict_ = self.dict_
707 our_stuff = self.properties
709 late_mapped = _get_immediate_cls_attr(
710 cls, "_sa_decl_prepare_nocascade", strict=True
711 )
713 for k in list(dict_):
715 if k in ("__table__", "__tablename__", "__mapper_args__"):
716 continue
718 value = dict_[k]
719 if _is_declarative_props(value):
720 if value._cascading:
721 util.warn(
722 "Use of @declared_attr.cascading only applies to "
723 "Declarative 'mixin' and 'abstract' classes. "
724 "Currently, this flag is ignored on mapped class "
725 "%s" % self.cls
726 )
728 value = getattr(cls, k)
730 elif (
731 isinstance(value, QueryableAttribute)
732 and value.class_ is not cls
733 and value.key != k
734 ):
735 # detect a QueryableAttribute that's already mapped being
736 # assigned elsewhere in userland, turn into a synonym()
737 value = SynonymProperty(value.key)
738 setattr(cls, k, value)
740 if (
741 isinstance(value, tuple)
742 and len(value) == 1
743 and isinstance(value[0], (Column, MapperProperty))
744 ):
745 util.warn(
746 "Ignoring declarative-like tuple value of attribute "
747 "'%s': possibly a copy-and-paste error with a comma "
748 "accidentally placed at the end of the line?" % k
749 )
750 continue
751 elif not isinstance(value, (Column, MapperProperty)):
752 # using @declared_attr for some object that
753 # isn't Column/MapperProperty; remove from the dict_
754 # and place the evaluated value onto the class.
755 if not k.startswith("__"):
756 dict_.pop(k)
757 self._warn_for_decl_attributes(cls, k, value)
758 if not late_mapped:
759 setattr(cls, k, value)
760 continue
761 # we expect to see the name 'metadata' in some valid cases;
762 # however at this point we see it's assigned to something trying
763 # to be mapped, so raise for that.
764 elif k == "metadata":
765 raise exc.InvalidRequestError(
766 "Attribute name 'metadata' is reserved "
767 "for the MetaData instance when using a "
768 "declarative base class."
769 )
770 our_stuff[k] = value
772 def _extract_declared_columns(self):
773 our_stuff = self.properties
775 # set up attributes in the order they were created
776 util.sort_dictionary(
777 our_stuff, key=lambda key: our_stuff[key]._creation_order
778 )
780 # extract columns from the class dict
781 declared_columns = self.declared_columns
782 name_to_prop_key = collections.defaultdict(set)
783 for key, c in list(our_stuff.items()):
784 if isinstance(c, (ColumnProperty, CompositeProperty)):
785 for col in c.columns:
786 if isinstance(col, Column) and col.table is None:
787 _undefer_column_name(key, col)
788 if not isinstance(c, CompositeProperty):
789 name_to_prop_key[col.name].add(key)
790 declared_columns.add(col)
791 elif isinstance(c, Column):
792 _undefer_column_name(key, c)
793 name_to_prop_key[c.name].add(key)
794 declared_columns.add(c)
795 # if the column is the same name as the key,
796 # remove it from the explicit properties dict.
797 # the normal rules for assigning column-based properties
798 # will take over, including precedence of columns
799 # in multi-column ColumnProperties.
800 if key == c.key:
801 del our_stuff[key]
803 for name, keys in name_to_prop_key.items():
804 if len(keys) > 1:
805 util.warn(
806 "On class %r, Column object %r named "
807 "directly multiple times, "
808 "only one will be used: %s. "
809 "Consider using orm.synonym instead"
810 % (self.classname, name, (", ".join(sorted(keys))))
811 )
813 def _setup_table(self, table=None):
814 cls = self.cls
815 tablename = self.tablename
816 table_args = self.table_args
817 dict_ = self.dict_
818 declared_columns = self.declared_columns
820 manager = attributes.manager_of_class(cls)
822 declared_columns = self.declared_columns = sorted(
823 declared_columns, key=lambda c: c._creation_order
824 )
826 if "__table__" not in dict_ and table is None:
827 if hasattr(cls, "__table_cls__"):
828 table_cls = util.unbound_method_to_callable(cls.__table_cls__)
829 else:
830 table_cls = Table
832 if tablename is not None:
834 args, table_kw = (), {}
835 if table_args:
836 if isinstance(table_args, dict):
837 table_kw = table_args
838 elif isinstance(table_args, tuple):
839 if isinstance(table_args[-1], dict):
840 args, table_kw = table_args[0:-1], table_args[-1]
841 else:
842 args = table_args
844 autoload_with = dict_.get("__autoload_with__")
845 if autoload_with:
846 table_kw["autoload_with"] = autoload_with
848 autoload = dict_.get("__autoload__")
849 if autoload:
850 table_kw["autoload"] = True
852 table = self.set_cls_attribute(
853 "__table__",
854 table_cls(
855 tablename,
856 self._metadata_for_cls(manager),
857 *(tuple(declared_columns) + tuple(args)),
858 **table_kw
859 ),
860 )
861 else:
862 if table is None:
863 table = cls.__table__
864 if declared_columns:
865 for c in declared_columns:
866 if not table.c.contains_column(c):
867 raise exc.ArgumentError(
868 "Can't add additional column %r when "
869 "specifying __table__" % c.key
870 )
871 self.local_table = table
873 def _metadata_for_cls(self, manager):
874 if hasattr(self.cls, "metadata"):
875 return self.cls.metadata
876 else:
877 return manager.registry.metadata
879 def _setup_inheritance(self, mapper_kw):
880 table = self.local_table
881 cls = self.cls
882 table_args = self.table_args
883 declared_columns = self.declared_columns
885 inherits = mapper_kw.get("inherits", None)
887 if inherits is None:
888 # since we search for classical mappings now, search for
889 # multiple mapped bases as well and raise an error.
890 inherits_search = []
891 for c in cls.__bases__:
892 c = _resolve_for_abstract_or_classical(c)
893 if c is None:
894 continue
895 if _declared_mapping_info(
896 c
897 ) is not None and not _get_immediate_cls_attr(
898 c, "_sa_decl_prepare_nocascade", strict=True
899 ):
900 if c not in inherits_search:
901 inherits_search.append(c)
903 if inherits_search:
904 if len(inherits_search) > 1:
905 raise exc.InvalidRequestError(
906 "Class %s has multiple mapped bases: %r"
907 % (cls, inherits_search)
908 )
909 inherits = inherits_search[0]
910 elif isinstance(inherits, mapper):
911 inherits = inherits.class_
913 self.inherits = inherits
915 if (
916 table is None
917 and self.inherits is None
918 and not _get_immediate_cls_attr(cls, "__no_table__")
919 ):
921 raise exc.InvalidRequestError(
922 "Class %r does not have a __table__ or __tablename__ "
923 "specified and does not inherit from an existing "
924 "table-mapped class." % cls
925 )
926 elif self.inherits:
927 inherited_mapper = _declared_mapping_info(self.inherits)
928 inherited_table = inherited_mapper.local_table
929 inherited_persist_selectable = inherited_mapper.persist_selectable
931 if table is None:
932 # single table inheritance.
933 # ensure no table args
934 if table_args:
935 raise exc.ArgumentError(
936 "Can't place __table_args__ on an inherited class "
937 "with no table."
938 )
939 # add any columns declared here to the inherited table.
940 for c in declared_columns:
941 if c.name in inherited_table.c:
942 if inherited_table.c[c.name] is c:
943 continue
944 raise exc.ArgumentError(
945 "Column '%s' on class %s conflicts with "
946 "existing column '%s'"
947 % (c, cls, inherited_table.c[c.name])
948 )
949 if c.primary_key:
950 raise exc.ArgumentError(
951 "Can't place primary key columns on an inherited "
952 "class with no table."
953 )
954 inherited_table.append_column(c)
955 if (
956 inherited_persist_selectable is not None
957 and inherited_persist_selectable is not inherited_table
958 ):
959 inherited_persist_selectable._refresh_for_new_column(c)
961 def _prepare_mapper_arguments(self, mapper_kw):
962 properties = self.properties
964 if self.mapper_args_fn:
965 mapper_args = self.mapper_args_fn()
966 else:
967 mapper_args = {}
969 if mapper_kw:
970 mapper_args.update(mapper_kw)
972 if "properties" in mapper_args:
973 properties = dict(properties)
974 properties.update(mapper_args["properties"])
976 # make sure that column copies are used rather
977 # than the original columns from any mixins
978 for k in ("version_id_col", "polymorphic_on"):
979 if k in mapper_args:
980 v = mapper_args[k]
981 mapper_args[k] = self.column_copies.get(v, v)
983 if "inherits" in mapper_args:
984 inherits_arg = mapper_args["inherits"]
985 if isinstance(inherits_arg, mapper):
986 inherits_arg = inherits_arg.class_
988 if inherits_arg is not self.inherits:
989 raise exc.InvalidRequestError(
990 "mapper inherits argument given for non-inheriting "
991 "class %s" % (mapper_args["inherits"])
992 )
994 if self.inherits:
995 mapper_args["inherits"] = self.inherits
997 if self.inherits and not mapper_args.get("concrete", False):
998 # single or joined inheritance
999 # exclude any cols on the inherited table which are
1000 # not mapped on the parent class, to avoid
1001 # mapping columns specific to sibling/nephew classes
1002 inherited_mapper = _declared_mapping_info(self.inherits)
1003 inherited_table = inherited_mapper.local_table
1005 if "exclude_properties" not in mapper_args:
1006 mapper_args["exclude_properties"] = exclude_properties = set(
1007 [
1008 c.key
1009 for c in inherited_table.c
1010 if c not in inherited_mapper._columntoproperty
1011 ]
1012 ).union(inherited_mapper.exclude_properties or ())
1013 exclude_properties.difference_update(
1014 [c.key for c in self.declared_columns]
1015 )
1017 # look through columns in the current mapper that
1018 # are keyed to a propname different than the colname
1019 # (if names were the same, we'd have popped it out above,
1020 # in which case the mapper makes this combination).
1021 # See if the superclass has a similar column property.
1022 # If so, join them together.
1023 for k, col in list(properties.items()):
1024 if not isinstance(col, expression.ColumnElement):
1025 continue
1026 if k in inherited_mapper._props:
1027 p = inherited_mapper._props[k]
1028 if isinstance(p, ColumnProperty):
1029 # note here we place the subclass column
1030 # first. See [ticket:1892] for background.
1031 properties[k] = [col] + p.columns
1032 result_mapper_args = mapper_args.copy()
1033 result_mapper_args["properties"] = properties
1034 self.mapper_args = result_mapper_args
1036 def map(self, mapper_kw=util.EMPTY_DICT):
1037 self._prepare_mapper_arguments(mapper_kw)
1038 if hasattr(self.cls, "__mapper_cls__"):
1039 mapper_cls = util.unbound_method_to_callable(
1040 self.cls.__mapper_cls__
1041 )
1042 else:
1043 mapper_cls = mapper
1045 return self.set_cls_attribute(
1046 "__mapper__",
1047 mapper_cls(self.cls, self.local_table, **self.mapper_args),
1048 )
1051@util.preload_module("sqlalchemy.orm.decl_api")
1052def _as_dc_declaredattr(field_metadata, sa_dataclass_metadata_key):
1053 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
1054 # we can't write it because field.metadata is immutable :( so we have
1055 # to go through extra trouble to compare these
1056 decl_api = util.preloaded.orm_decl_api
1057 obj = field_metadata[sa_dataclass_metadata_key]
1058 if callable(obj) and not isinstance(obj, decl_api.declared_attr):
1059 return decl_api.declared_attr(obj)
1060 else:
1061 return obj
1064class _DeferredMapperConfig(_ClassScanMapperConfig):
1065 _configs = util.OrderedDict()
1067 def _early_mapping(self, mapper_kw):
1068 pass
1070 @property
1071 def cls(self):
1072 return self._cls()
1074 @cls.setter
1075 def cls(self, class_):
1076 self._cls = weakref.ref(class_, self._remove_config_cls)
1077 self._configs[self._cls] = self
1079 @classmethod
1080 def _remove_config_cls(cls, ref):
1081 cls._configs.pop(ref, None)
1083 @classmethod
1084 def has_cls(cls, class_):
1085 # 2.6 fails on weakref if class_ is an old style class
1086 return isinstance(class_, type) and weakref.ref(class_) in cls._configs
1088 @classmethod
1089 def raise_unmapped_for_cls(cls, class_):
1090 if hasattr(class_, "_sa_raise_deferred_config"):
1091 class_._sa_raise_deferred_config()
1093 raise orm_exc.UnmappedClassError(
1094 class_,
1095 msg="Class %s has a deferred mapping on it. It is not yet "
1096 "usable as a mapped class." % orm_exc._safe_cls_name(class_),
1097 )
1099 @classmethod
1100 def config_for_cls(cls, class_):
1101 return cls._configs[weakref.ref(class_)]
1103 @classmethod
1104 def classes_for_base(cls, base_cls, sort=True):
1105 classes_for_base = [
1106 m
1107 for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
1108 if cls_ is not None and issubclass(cls_, base_cls)
1109 ]
1111 if not sort:
1112 return classes_for_base
1114 all_m_by_cls = dict((m.cls, m) for m in classes_for_base)
1116 tuples = []
1117 for m_cls in all_m_by_cls:
1118 tuples.extend(
1119 (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
1120 for base_cls in m_cls.__bases__
1121 if base_cls in all_m_by_cls
1122 )
1123 return list(topological.sort(tuples, classes_for_base))
1125 def map(self, mapper_kw=util.EMPTY_DICT):
1126 self._configs.pop(self._cls, None)
1127 return super(_DeferredMapperConfig, self).map(mapper_kw)
1130def _add_attribute(cls, key, value):
1131 """add an attribute to an existing declarative class.
1133 This runs through the logic to determine MapperProperty,
1134 adds it to the Mapper, adds a column to the mapped Table, etc.
1136 """
1138 if "__mapper__" in cls.__dict__:
1139 if isinstance(value, Column):
1140 _undefer_column_name(key, value)
1141 cls.__table__.append_column(value, replace_existing=True)
1142 cls.__mapper__.add_property(key, value)
1143 elif isinstance(value, ColumnProperty):
1144 for col in value.columns:
1145 if isinstance(col, Column) and col.table is None:
1146 _undefer_column_name(key, col)
1147 cls.__table__.append_column(col, replace_existing=True)
1148 cls.__mapper__.add_property(key, value)
1149 elif isinstance(value, MapperProperty):
1150 cls.__mapper__.add_property(key, value)
1151 elif isinstance(value, QueryableAttribute) and value.key != key:
1152 # detect a QueryableAttribute that's already mapped being
1153 # assigned elsewhere in userland, turn into a synonym()
1154 value = SynonymProperty(value.key)
1155 cls.__mapper__.add_property(key, value)
1156 else:
1157 type.__setattr__(cls, key, value)
1158 cls.__mapper__._expire_memoizations()
1159 else:
1160 type.__setattr__(cls, key, value)
1163def _del_attribute(cls, key):
1165 if (
1166 "__mapper__" in cls.__dict__
1167 and key in cls.__dict__
1168 and not cls.__mapper__._dispose_called
1169 ):
1170 value = cls.__dict__[key]
1171 if isinstance(
1172 value, (Column, ColumnProperty, MapperProperty, QueryableAttribute)
1173 ):
1174 raise NotImplementedError(
1175 "Can't un-map individual mapped attributes on a mapped class."
1176 )
1177 else:
1178 type.__delattr__(cls, key)
1179 cls.__mapper__._expire_memoizations()
1180 else:
1181 type.__delattr__(cls, key)
1184def _declarative_constructor(self, **kwargs):
1185 """A simple constructor that allows initialization from kwargs.
1187 Sets attributes on the constructed instance using the names and
1188 values in ``kwargs``.
1190 Only keys that are present as
1191 attributes of the instance's class are allowed. These could be,
1192 for example, any mapped columns or relationships.
1193 """
1194 cls_ = type(self)
1195 for k in kwargs:
1196 if not hasattr(cls_, k):
1197 raise TypeError(
1198 "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
1199 )
1200 setattr(self, k, kwargs[k])
1203_declarative_constructor.__name__ = "__init__"
1206def _undefer_column_name(key, column):
1207 if column.key is None:
1208 column.key = key
1209 if column.name is None:
1210 column.name = key