Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

885 statements  

1# orm/decl_base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Internal implementation for declarative.""" 

9 

10from __future__ import annotations 

11 

12import collections 

13import dataclasses 

14import re 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Dict 

19from typing import Iterable 

20from typing import List 

21from typing import Mapping 

22from typing import NamedTuple 

23from typing import NoReturn 

24from typing import Optional 

25from typing import Protocol 

26from typing import Sequence 

27from typing import Tuple 

28from typing import Type 

29from typing import TYPE_CHECKING 

30from typing import TypedDict 

31from typing import TypeVar 

32from typing import Union 

33import weakref 

34 

35from . import attributes 

36from . import clsregistry 

37from . import exc as orm_exc 

38from . import instrumentation 

39from . import mapperlib 

40from ._typing import _O 

41from ._typing import attr_is_internal_proxy 

42from .attributes import InstrumentedAttribute 

43from .attributes import QueryableAttribute 

44from .base import _is_mapped_class 

45from .base import InspectionAttr 

46from .descriptor_props import CompositeProperty 

47from .descriptor_props import SynonymProperty 

48from .interfaces import _AttributeOptions 

49from .interfaces import _DCAttributeOptions 

50from .interfaces import _IntrospectsAnnotations 

51from .interfaces import _MappedAttribute 

52from .interfaces import _MapsColumns 

53from .interfaces import MapperProperty 

54from .mapper import Mapper 

55from .properties import ColumnProperty 

56from .properties import MappedColumn 

57from .util import _extract_mapped_subtype 

58from .util import _is_mapped_annotation 

59from .util import class_mapper 

60from .util import de_stringify_annotation 

61from .. import event 

62from .. import exc 

63from .. import util 

64from ..sql import expression 

65from ..sql.base import _NoArg 

66from ..sql.schema import Column 

67from ..sql.schema import Table 

68from ..util import topological 

69from ..util.typing import _AnnotationScanType 

70from ..util.typing import is_fwd_ref 

71from ..util.typing import is_literal 

72from ..util.typing import typing_get_args 

73 

74if TYPE_CHECKING: 

75 from ._typing import _ClassDict 

76 from ._typing import _RegistryType 

77 from .base import Mapped 

78 from .decl_api import declared_attr 

79 from .instrumentation import ClassManager 

80 from ..sql.elements import NamedColumn 

81 from ..sql.schema import MetaData 

82 from ..sql.selectable import FromClause 

83 

84_T = TypeVar("_T", bound=Any) 

85 

86_MapperKwArgs = Mapping[str, Any] 

87_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]] 

88 

89 

90class MappedClassProtocol(Protocol[_O]): 

91 """A protocol representing a SQLAlchemy mapped class. 

92 

93 The protocol is generic on the type of class, use 

94 ``MappedClassProtocol[Any]`` to allow any mapped class. 

95 """ 

96 

97 __name__: str 

98 __mapper__: Mapper[_O] 

99 __table__: FromClause 

100 

101 def __call__(self, **kw: Any) -> _O: ... 

102 

103 

104class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol): 

105 "Internal more detailed version of ``MappedClassProtocol``." 

106 metadata: MetaData 

107 __tablename__: str 

108 __mapper_args__: _MapperKwArgs 

109 __table_args__: Optional[_TableArgsType] 

110 

111 _sa_apply_dc_transforms: Optional[_DataclassArguments] 

112 

113 def __declare_first__(self) -> None: ... 

114 

115 def __declare_last__(self) -> None: ... 

116 

117 

118class _DataclassArguments(TypedDict): 

119 init: Union[_NoArg, bool] 

120 repr: Union[_NoArg, bool] 

121 eq: Union[_NoArg, bool] 

122 order: Union[_NoArg, bool] 

123 unsafe_hash: Union[_NoArg, bool] 

124 match_args: Union[_NoArg, bool] 

125 kw_only: Union[_NoArg, bool] 

126 dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]] 

127 

128 

129def _declared_mapping_info( 

130 cls: Type[Any], 

131) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]: 

132 # deferred mapping 

133 if _DeferredMapperConfig.has_cls(cls): 

134 return _DeferredMapperConfig.config_for_cls(cls) 

135 # regular mapping 

136 elif _is_mapped_class(cls): 

137 return class_mapper(cls, configure=False) 

138 else: 

139 return None 

140 

141 

142def _is_supercls_for_inherits(cls: Type[Any]) -> bool: 

143 """return True if this class will be used as a superclass to set in 

144 'inherits'. 

145 

146 This includes deferred mapper configs that aren't mapped yet, however does 

147 not include classes with _sa_decl_prepare_nocascade (e.g. 

148 ``AbstractConcreteBase``); these concrete-only classes are not set up as 

149 "inherits" until after mappers are configured using 

150 mapper._set_concrete_base() 

151 

152 """ 

153 if _DeferredMapperConfig.has_cls(cls): 

154 return not _get_immediate_cls_attr( 

155 cls, "_sa_decl_prepare_nocascade", strict=True 

156 ) 

157 # regular mapping 

158 elif _is_mapped_class(cls): 

159 return True 

160 else: 

161 return False 

162 

163 

164def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]: 

165 if cls is object: 

166 return None 

167 

168 sup: Optional[Type[Any]] 

169 

170 if cls.__dict__.get("__abstract__", False): 

171 for base_ in cls.__bases__: 

172 sup = _resolve_for_abstract_or_classical(base_) 

173 if sup is not None: 

174 return sup 

175 else: 

176 return None 

177 else: 

178 clsmanager = _dive_for_cls_manager(cls) 

179 

180 if clsmanager: 

181 return clsmanager.class_ 

182 else: 

183 return cls 

184 

185 

186def _get_immediate_cls_attr( 

187 cls: Type[Any], attrname: str, strict: bool = False 

188) -> Optional[Any]: 

189 """return an attribute of the class that is either present directly 

190 on the class, e.g. not on a superclass, or is from a superclass but 

191 this superclass is a non-mapped mixin, that is, not a descendant of 

192 the declarative base and is also not classically mapped. 

193 

194 This is used to detect attributes that indicate something about 

195 a mapped class independently from any mapped classes that it may 

196 inherit from. 

197 

198 """ 

199 

200 # the rules are different for this name than others, 

201 # make sure we've moved it out. transitional 

202 assert attrname != "__abstract__" 

203 

204 if not issubclass(cls, object): 

205 return None 

206 

207 if attrname in cls.__dict__: 

208 return getattr(cls, attrname) 

209 

210 for base in cls.__mro__[1:]: 

211 _is_classical_inherits = _dive_for_cls_manager(base) is not None 

212 

213 if attrname in base.__dict__ and ( 

214 base is cls 

215 or ( 

216 (base in cls.__bases__ if strict else True) 

217 and not _is_classical_inherits 

218 ) 

219 ): 

220 return getattr(base, attrname) 

221 else: 

222 return None 

223 

224 

225def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]: 

226 # because the class manager registration is pluggable, 

227 # we need to do the search for every class in the hierarchy, 

228 # rather than just a simple "cls._sa_class_manager" 

229 

230 for base in cls.__mro__: 

231 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class( 

232 base 

233 ) 

234 if manager: 

235 return manager 

236 return None 

237 

238 

239def _as_declarative( 

240 registry: _RegistryType, cls: Type[Any], dict_: _ClassDict 

241) -> Optional[_MapperConfig]: 

242 # declarative scans the class for attributes. no table or mapper 

243 # args passed separately. 

244 return _MapperConfig.setup_mapping(registry, cls, dict_, None, {}) 

245 

246 

247def _mapper( 

248 registry: _RegistryType, 

249 cls: Type[_O], 

250 table: Optional[FromClause], 

251 mapper_kw: _MapperKwArgs, 

252) -> Mapper[_O]: 

253 _ImperativeMapperConfig(registry, cls, table, mapper_kw) 

254 return cast("MappedClassProtocol[_O]", cls).__mapper__ 

255 

256 

257@util.preload_module("sqlalchemy.orm.decl_api") 

258def _is_declarative_props(obj: Any) -> bool: 

259 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common 

260 

261 return isinstance(obj, (_declared_attr_common, util.classproperty)) 

262 

263 

264def _check_declared_props_nocascade( 

265 obj: Any, name: str, cls: Type[_O] 

266) -> bool: 

267 if _is_declarative_props(obj): 

268 if getattr(obj, "_cascading", False): 

269 util.warn( 

270 "@declared_attr.cascading is not supported on the %s " 

271 "attribute on class %s. This attribute invokes for " 

272 "subclasses in any case." % (name, cls) 

273 ) 

274 return True 

275 else: 

276 return False 

277 

278 

279class _MapperConfig: 

280 __slots__ = ( 

281 "cls", 

282 "classname", 

283 "properties", 

284 "declared_attr_reg", 

285 "__weakref__", 

286 ) 

287 

288 cls: Type[Any] 

289 classname: str 

290 properties: util.OrderedDict[ 

291 str, 

292 Union[ 

293 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any] 

294 ], 

295 ] 

296 declared_attr_reg: Dict[declared_attr[Any], Any] 

297 

298 @classmethod 

299 def setup_mapping( 

300 cls, 

301 registry: _RegistryType, 

302 cls_: Type[_O], 

303 dict_: _ClassDict, 

304 table: Optional[FromClause], 

305 mapper_kw: _MapperKwArgs, 

306 ) -> Optional[_MapperConfig]: 

307 manager = attributes.opt_manager_of_class(cls) 

308 if manager and manager.class_ is cls_: 

309 raise exc.InvalidRequestError( 

310 f"Class {cls!r} already has been instrumented declaratively" 

311 ) 

312 

313 if cls_.__dict__.get("__abstract__", False): 

314 return None 

315 

316 defer_map = _get_immediate_cls_attr( 

317 cls_, "_sa_decl_prepare_nocascade", strict=True 

318 ) or hasattr(cls_, "_sa_decl_prepare") 

319 

320 if defer_map: 

321 return _DeferredMapperConfig( 

322 registry, cls_, dict_, table, mapper_kw 

323 ) 

324 else: 

325 return _ClassScanMapperConfig( 

326 registry, cls_, dict_, table, mapper_kw 

327 ) 

328 

329 def __init__( 

330 self, 

331 registry: _RegistryType, 

332 cls_: Type[Any], 

333 mapper_kw: _MapperKwArgs, 

334 ): 

335 self.cls = util.assert_arg_type(cls_, type, "cls_") 

336 self.classname = cls_.__name__ 

337 self.properties = util.OrderedDict() 

338 self.declared_attr_reg = {} 

339 

340 if not mapper_kw.get("non_primary", False): 

341 instrumentation.register_class( 

342 self.cls, 

343 finalize=False, 

344 registry=registry, 

345 declarative_scan=self, 

346 init_method=registry.constructor, 

347 ) 

348 else: 

349 manager = attributes.opt_manager_of_class(self.cls) 

350 if not manager or not manager.is_mapped: 

351 raise exc.InvalidRequestError( 

352 "Class %s has no primary mapper configured. Configure " 

353 "a primary mapper first before setting up a non primary " 

354 "Mapper." % self.cls 

355 ) 

356 

357 def set_cls_attribute(self, attrname: str, value: _T) -> _T: 

358 manager = instrumentation.manager_of_class(self.cls) 

359 manager.install_member(attrname, value) 

360 return value 

361 

362 def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]: 

363 raise NotImplementedError() 

364 

365 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None: 

366 self.map(mapper_kw) 

367 

368 

369class _ImperativeMapperConfig(_MapperConfig): 

370 __slots__ = ("local_table", "inherits") 

371 

372 def __init__( 

373 self, 

374 registry: _RegistryType, 

375 cls_: Type[_O], 

376 table: Optional[FromClause], 

377 mapper_kw: _MapperKwArgs, 

378 ): 

379 super().__init__(registry, cls_, mapper_kw) 

380 

381 self.local_table = self.set_cls_attribute("__table__", table) 

382 

383 with mapperlib._CONFIGURE_MUTEX: 

384 if not mapper_kw.get("non_primary", False): 

385 clsregistry.add_class( 

386 self.classname, self.cls, registry._class_registry 

387 ) 

388 

389 self._setup_inheritance(mapper_kw) 

390 

391 self._early_mapping(mapper_kw) 

392 

393 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

394 mapper_cls = Mapper 

395 

396 return self.set_cls_attribute( 

397 "__mapper__", 

398 mapper_cls(self.cls, self.local_table, **mapper_kw), 

399 ) 

400 

401 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None: 

402 cls = self.cls 

403 

404 inherits = mapper_kw.get("inherits", None) 

405 

406 if inherits is None: 

407 # since we search for classical mappings now, search for 

408 # multiple mapped bases as well and raise an error. 

409 inherits_search = [] 

410 for base_ in cls.__bases__: 

411 c = _resolve_for_abstract_or_classical(base_) 

412 if c is None: 

413 continue 

414 

415 if _is_supercls_for_inherits(c) and c not in inherits_search: 

416 inherits_search.append(c) 

417 

418 if inherits_search: 

419 if len(inherits_search) > 1: 

420 raise exc.InvalidRequestError( 

421 "Class %s has multiple mapped bases: %r" 

422 % (cls, inherits_search) 

423 ) 

424 inherits = inherits_search[0] 

425 elif isinstance(inherits, Mapper): 

426 inherits = inherits.class_ 

427 

428 self.inherits = inherits 

429 

430 

431class _CollectedAnnotation(NamedTuple): 

432 raw_annotation: _AnnotationScanType 

433 mapped_container: Optional[Type[Mapped[Any]]] 

434 extracted_mapped_annotation: Union[_AnnotationScanType, str] 

435 is_dataclass: bool 

436 attr_value: Any 

437 originating_module: str 

438 originating_class: Type[Any] 

439 

440 

441class _ClassScanMapperConfig(_MapperConfig): 

442 __slots__ = ( 

443 "registry", 

444 "clsdict_view", 

445 "collected_attributes", 

446 "collected_annotations", 

447 "local_table", 

448 "persist_selectable", 

449 "declared_columns", 

450 "column_ordering", 

451 "column_copies", 

452 "table_args", 

453 "tablename", 

454 "mapper_args", 

455 "mapper_args_fn", 

456 "table_fn", 

457 "inherits", 

458 "single", 

459 "allow_dataclass_fields", 

460 "dataclass_setup_arguments", 

461 "is_dataclass_prior_to_mapping", 

462 "allow_unmapped_annotations", 

463 ) 

464 

465 is_deferred = False 

466 registry: _RegistryType 

467 clsdict_view: _ClassDict 

468 collected_annotations: Dict[str, _CollectedAnnotation] 

469 collected_attributes: Dict[str, Any] 

470 local_table: Optional[FromClause] 

471 persist_selectable: Optional[FromClause] 

472 declared_columns: util.OrderedSet[Column[Any]] 

473 column_ordering: Dict[Column[Any], int] 

474 column_copies: Dict[ 

475 Union[MappedColumn[Any], Column[Any]], 

476 Union[MappedColumn[Any], Column[Any]], 

477 ] 

478 tablename: Optional[str] 

479 mapper_args: Mapping[str, Any] 

480 table_args: Optional[_TableArgsType] 

481 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]] 

482 inherits: Optional[Type[Any]] 

483 single: bool 

484 

485 is_dataclass_prior_to_mapping: bool 

486 allow_unmapped_annotations: bool 

487 

488 dataclass_setup_arguments: Optional[_DataclassArguments] 

489 """if the class has SQLAlchemy native dataclass parameters, where 

490 we will turn the class into a dataclass within the declarative mapping 

491 process. 

492 

493 """ 

494 

495 allow_dataclass_fields: bool 

496 """if true, look for dataclass-processed Field objects on the target 

497 class as well as superclasses and extract ORM mapping directives from 

498 the "metadata" attribute of each Field. 

499 

500 if False, dataclass fields can still be used, however they won't be 

501 mapped. 

502 

503 """ 

504 

505 def __init__( 

506 self, 

507 registry: _RegistryType, 

508 cls_: Type[_O], 

509 dict_: _ClassDict, 

510 table: Optional[FromClause], 

511 mapper_kw: _MapperKwArgs, 

512 ): 

513 # grab class dict before the instrumentation manager has been added. 

514 # reduces cycles 

515 self.clsdict_view = ( 

516 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT 

517 ) 

518 super().__init__(registry, cls_, mapper_kw) 

519 self.registry = registry 

520 self.persist_selectable = None 

521 

522 self.collected_attributes = {} 

523 self.collected_annotations = {} 

524 self.declared_columns = util.OrderedSet() 

525 self.column_ordering = {} 

526 self.column_copies = {} 

527 self.single = False 

528 self.dataclass_setup_arguments = dca = getattr( 

529 self.cls, "_sa_apply_dc_transforms", None 

530 ) 

531 

532 self.allow_unmapped_annotations = getattr( 

533 self.cls, "__allow_unmapped__", False 

534 ) or bool(self.dataclass_setup_arguments) 

535 

536 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass( 

537 cls_ 

538 ) 

539 

540 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__") 

541 

542 # we don't want to consume Field objects from a not-already-dataclass. 

543 # the Field objects won't have their "name" or "type" populated, 

544 # and while it seems like we could just set these on Field as we 

545 # read them, Field is documented as "user read only" and we need to 

546 # stay far away from any off-label use of dataclasses APIs. 

547 if (not cld or dca) and sdk: 

548 raise exc.InvalidRequestError( 

549 "SQLAlchemy mapped dataclasses can't consume mapping " 

550 "information from dataclass.Field() objects if the immediate " 

551 "class is not already a dataclass." 

552 ) 

553 

554 # if already a dataclass, and __sa_dataclass_metadata_key__ present, 

555 # then also look inside of dataclass.Field() objects yielded by 

556 # dataclasses.get_fields(cls) when scanning for attributes 

557 self.allow_dataclass_fields = bool(sdk and cld) 

558 

559 self._setup_declared_events() 

560 

561 self._scan_attributes() 

562 

563 self._setup_dataclasses_transforms() 

564 

565 with mapperlib._CONFIGURE_MUTEX: 

566 clsregistry.add_class( 

567 self.classname, self.cls, registry._class_registry 

568 ) 

569 

570 self._setup_inheriting_mapper(mapper_kw) 

571 

572 self._extract_mappable_attributes() 

573 

574 self._extract_declared_columns() 

575 

576 self._setup_table(table) 

577 

578 self._setup_inheriting_columns(mapper_kw) 

579 

580 self._early_mapping(mapper_kw) 

581 

582 def _setup_declared_events(self) -> None: 

583 if _get_immediate_cls_attr(self.cls, "__declare_last__"): 

584 

585 @event.listens_for(Mapper, "after_configured") 

586 def after_configured() -> None: 

587 cast( 

588 "_DeclMappedClassProtocol[Any]", self.cls 

589 ).__declare_last__() 

590 

591 if _get_immediate_cls_attr(self.cls, "__declare_first__"): 

592 

593 @event.listens_for(Mapper, "before_configured") 

594 def before_configured() -> None: 

595 cast( 

596 "_DeclMappedClassProtocol[Any]", self.cls 

597 ).__declare_first__() 

598 

599 def _cls_attr_override_checker( 

600 self, cls: Type[_O] 

601 ) -> Callable[[str, Any], bool]: 

602 """Produce a function that checks if a class has overridden an 

603 attribute, taking SQLAlchemy-enabled dataclass fields into account. 

604 

605 """ 

606 

607 if self.allow_dataclass_fields: 

608 sa_dataclass_metadata_key = _get_immediate_cls_attr( 

609 cls, "__sa_dataclass_metadata_key__" 

610 ) 

611 else: 

612 sa_dataclass_metadata_key = None 

613 

614 if not sa_dataclass_metadata_key: 

615 

616 def attribute_is_overridden(key: str, obj: Any) -> bool: 

617 return getattr(cls, key, obj) is not obj 

618 

619 else: 

620 all_datacls_fields = { 

621 f.name: f.metadata[sa_dataclass_metadata_key] 

622 for f in util.dataclass_fields(cls) 

623 if sa_dataclass_metadata_key in f.metadata 

624 } 

625 local_datacls_fields = { 

626 f.name: f.metadata[sa_dataclass_metadata_key] 

627 for f in util.local_dataclass_fields(cls) 

628 if sa_dataclass_metadata_key in f.metadata 

629 } 

630 

631 absent = object() 

632 

633 def attribute_is_overridden(key: str, obj: Any) -> bool: 

634 if _is_declarative_props(obj): 

635 obj = obj.fget 

636 

637 # this function likely has some failure modes still if 

638 # someone is doing a deep mixing of the same attribute 

639 # name as plain Python attribute vs. dataclass field. 

640 

641 ret = local_datacls_fields.get(key, absent) 

642 if _is_declarative_props(ret): 

643 ret = ret.fget 

644 

645 if ret is obj: 

646 return False 

647 elif ret is not absent: 

648 return True 

649 

650 all_field = all_datacls_fields.get(key, absent) 

651 

652 ret = getattr(cls, key, obj) 

653 

654 if ret is obj: 

655 return False 

656 

657 # for dataclasses, this could be the 

658 # 'default' of the field. so filter more specifically 

659 # for an already-mapped InstrumentedAttribute 

660 if ret is not absent and isinstance( 

661 ret, InstrumentedAttribute 

662 ): 

663 return True 

664 

665 if all_field is obj: 

666 return False 

667 elif all_field is not absent: 

668 return True 

669 

670 # can't find another attribute 

671 return False 

672 

673 return attribute_is_overridden 

674 

675 _include_dunders = { 

676 "__table__", 

677 "__mapper_args__", 

678 "__tablename__", 

679 "__table_args__", 

680 } 

681 

682 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)") 

683 

684 def _cls_attr_resolver( 

685 self, cls: Type[Any] 

686 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]: 

687 """produce a function to iterate the "attributes" of a class 

688 which we want to consider for mapping, adjusting for SQLAlchemy fields 

689 embedded in dataclass fields. 

690 

691 """ 

692 cls_annotations = util.get_annotations(cls) 

693 

694 cls_vars = vars(cls) 

695 

696 _include_dunders = self._include_dunders 

697 _match_exclude_dunders = self._match_exclude_dunders 

698 

699 names = [ 

700 n 

701 for n in util.merge_lists_w_ordering( 

702 list(cls_vars), list(cls_annotations) 

703 ) 

704 if not _match_exclude_dunders.match(n) or n in _include_dunders 

705 ] 

706 

707 if self.allow_dataclass_fields: 

708 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr( 

709 cls, "__sa_dataclass_metadata_key__" 

710 ) 

711 else: 

712 sa_dataclass_metadata_key = None 

713 

714 if not sa_dataclass_metadata_key: 

715 

716 def local_attributes_for_class() -> ( 

717 Iterable[Tuple[str, Any, Any, bool]] 

718 ): 

719 return ( 

720 ( 

721 name, 

722 cls_vars.get(name), 

723 cls_annotations.get(name), 

724 False, 

725 ) 

726 for name in names 

727 ) 

728 

729 else: 

730 dataclass_fields = { 

731 field.name: field for field in util.local_dataclass_fields(cls) 

732 } 

733 

734 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key 

735 

736 def local_attributes_for_class() -> ( 

737 Iterable[Tuple[str, Any, Any, bool]] 

738 ): 

739 for name in names: 

740 field = dataclass_fields.get(name, None) 

741 if field and sa_dataclass_metadata_key in field.metadata: 

742 yield field.name, _as_dc_declaredattr( 

743 field.metadata, fixed_sa_dataclass_metadata_key 

744 ), cls_annotations.get(field.name), True 

745 else: 

746 yield name, cls_vars.get(name), cls_annotations.get( 

747 name 

748 ), False 

749 

750 return local_attributes_for_class 

751 

752 def _scan_attributes(self) -> None: 

753 cls = self.cls 

754 

755 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls) 

756 

757 clsdict_view = self.clsdict_view 

758 collected_attributes = self.collected_attributes 

759 column_copies = self.column_copies 

760 _include_dunders = self._include_dunders 

761 mapper_args_fn = None 

762 table_args = inherited_table_args = None 

763 table_fn = None 

764 tablename = None 

765 fixed_table = "__table__" in clsdict_view 

766 

767 attribute_is_overridden = self._cls_attr_override_checker(self.cls) 

768 

769 bases = [] 

770 

771 for base in cls.__mro__: 

772 # collect bases and make sure standalone columns are copied 

773 # to be the column they will ultimately be on the class, 

774 # so that declared_attr functions use the right columns. 

775 # need to do this all the way up the hierarchy first 

776 # (see #8190) 

777 

778 class_mapped = base is not cls and _is_supercls_for_inherits(base) 

779 

780 local_attributes_for_class = self._cls_attr_resolver(base) 

781 

782 if not class_mapped and base is not cls: 

783 locally_collected_columns = self._produce_column_copies( 

784 local_attributes_for_class, 

785 attribute_is_overridden, 

786 fixed_table, 

787 base, 

788 ) 

789 else: 

790 locally_collected_columns = {} 

791 

792 bases.append( 

793 ( 

794 base, 

795 class_mapped, 

796 local_attributes_for_class, 

797 locally_collected_columns, 

798 ) 

799 ) 

800 

801 for ( 

802 base, 

803 class_mapped, 

804 local_attributes_for_class, 

805 locally_collected_columns, 

806 ) in bases: 

807 # this transfer can also take place as we scan each name 

808 # for finer-grained control of how collected_attributes is 

809 # populated, as this is what impacts column ordering. 

810 # however it's simpler to get it out of the way here. 

811 collected_attributes.update(locally_collected_columns) 

812 

813 for ( 

814 name, 

815 obj, 

816 annotation, 

817 is_dataclass_field, 

818 ) in local_attributes_for_class(): 

819 if name in _include_dunders: 

820 if name == "__mapper_args__": 

821 check_decl = _check_declared_props_nocascade( 

822 obj, name, cls 

823 ) 

824 if not mapper_args_fn and ( 

825 not class_mapped or check_decl 

826 ): 

827 # don't even invoke __mapper_args__ until 

828 # after we've determined everything about the 

829 # mapped table. 

830 # make a copy of it so a class-level dictionary 

831 # is not overwritten when we update column-based 

832 # arguments. 

833 def _mapper_args_fn() -> Dict[str, Any]: 

834 return dict(cls_as_Decl.__mapper_args__) 

835 

836 mapper_args_fn = _mapper_args_fn 

837 

838 elif name == "__tablename__": 

839 check_decl = _check_declared_props_nocascade( 

840 obj, name, cls 

841 ) 

842 if not tablename and (not class_mapped or check_decl): 

843 tablename = cls_as_Decl.__tablename__ 

844 elif name == "__table__": 

845 check_decl = _check_declared_props_nocascade( 

846 obj, name, cls 

847 ) 

848 # if a @declared_attr using "__table__" is detected, 

849 # wrap up a callable to look for "__table__" from 

850 # the final concrete class when we set up a table. 

851 # this was fixed by 

852 # #11509, regression in 2.0 from version 1.4. 

853 if check_decl and not table_fn: 

854 # don't even invoke __table__ until we're ready 

855 def _table_fn() -> FromClause: 

856 return cls_as_Decl.__table__ 

857 

858 table_fn = _table_fn 

859 

860 elif name == "__table_args__": 

861 check_decl = _check_declared_props_nocascade( 

862 obj, name, cls 

863 ) 

864 if not table_args and (not class_mapped or check_decl): 

865 table_args = cls_as_Decl.__table_args__ 

866 if not isinstance( 

867 table_args, (tuple, dict, type(None)) 

868 ): 

869 raise exc.ArgumentError( 

870 "__table_args__ value must be a tuple, " 

871 "dict, or None" 

872 ) 

873 if base is not cls: 

874 inherited_table_args = True 

875 else: 

876 # any other dunder names; should not be here 

877 # as we have tested for all four names in 

878 # _include_dunders 

879 assert False 

880 elif class_mapped: 

881 if _is_declarative_props(obj) and not obj._quiet: 

882 util.warn( 

883 "Regular (i.e. not __special__) " 

884 "attribute '%s.%s' uses @declared_attr, " 

885 "but owning class %s is mapped - " 

886 "not applying to subclass %s." 

887 % (base.__name__, name, base, cls) 

888 ) 

889 

890 continue 

891 elif base is not cls: 

892 # we're a mixin, abstract base, or something that is 

893 # acting like that for now. 

894 

895 if isinstance(obj, (Column, MappedColumn)): 

896 # already copied columns to the mapped class. 

897 continue 

898 elif isinstance(obj, MapperProperty): 

899 raise exc.InvalidRequestError( 

900 "Mapper properties (i.e. deferred," 

901 "column_property(), relationship(), etc.) must " 

902 "be declared as @declared_attr callables " 

903 "on declarative mixin classes. For dataclass " 

904 "field() objects, use a lambda:" 

905 ) 

906 elif _is_declarative_props(obj): 

907 # tried to get overloads to tell this to 

908 # pylance, no luck 

909 assert obj is not None 

910 

911 if obj._cascading: 

912 if name in clsdict_view: 

913 # unfortunately, while we can use the user- 

914 # defined attribute here to allow a clean 

915 # override, if there's another 

916 # subclass below then it still tries to use 

917 # this. not sure if there is enough 

918 # information here to add this as a feature 

919 # later on. 

920 util.warn( 

921 "Attribute '%s' on class %s cannot be " 

922 "processed due to " 

923 "@declared_attr.cascading; " 

924 "skipping" % (name, cls) 

925 ) 

926 collected_attributes[name] = column_copies[obj] = ( 

927 ret 

928 ) = obj.__get__(obj, cls) 

929 setattr(cls, name, ret) 

930 else: 

931 if is_dataclass_field: 

932 # access attribute using normal class access 

933 # first, to see if it's been mapped on a 

934 # superclass. note if the dataclasses.field() 

935 # has "default", this value can be anything. 

936 ret = getattr(cls, name, None) 

937 

938 # so, if it's anything that's not ORM 

939 # mapped, assume we should invoke the 

940 # declared_attr 

941 if not isinstance(ret, InspectionAttr): 

942 ret = obj.fget() 

943 else: 

944 # access attribute using normal class access. 

945 # if the declared attr already took place 

946 # on a superclass that is mapped, then 

947 # this is no longer a declared_attr, it will 

948 # be the InstrumentedAttribute 

949 ret = getattr(cls, name) 

950 

951 # correct for proxies created from hybrid_property 

952 # or similar. note there is no known case that 

953 # produces nested proxies, so we are only 

954 # looking one level deep right now. 

955 

956 if ( 

957 isinstance(ret, InspectionAttr) 

958 and attr_is_internal_proxy(ret) 

959 and not isinstance( 

960 ret.original_property, MapperProperty 

961 ) 

962 ): 

963 ret = ret.descriptor 

964 

965 collected_attributes[name] = column_copies[obj] = ( 

966 ret 

967 ) 

968 

969 if ( 

970 isinstance(ret, (Column, MapperProperty)) 

971 and ret.doc is None 

972 ): 

973 ret.doc = obj.__doc__ 

974 

975 self._collect_annotation( 

976 name, 

977 obj._collect_return_annotation(), 

978 base, 

979 True, 

980 obj, 

981 ) 

982 elif _is_mapped_annotation(annotation, cls, base): 

983 # Mapped annotation without any object. 

984 # product_column_copies should have handled this. 

985 # if future support for other MapperProperty, 

986 # then test if this name is already handled and 

987 # otherwise proceed to generate. 

988 if not fixed_table: 

989 assert ( 

990 name in collected_attributes 

991 or attribute_is_overridden(name, None) 

992 ) 

993 continue 

994 else: 

995 # here, the attribute is some other kind of 

996 # property that we assume is not part of the 

997 # declarative mapping. however, check for some 

998 # more common mistakes 

999 self._warn_for_decl_attributes(base, name, obj) 

1000 elif is_dataclass_field and ( 

1001 name not in clsdict_view or clsdict_view[name] is not obj 

1002 ): 

1003 # here, we are definitely looking at the target class 

1004 # and not a superclass. this is currently a 

1005 # dataclass-only path. if the name is only 

1006 # a dataclass field and isn't in local cls.__dict__, 

1007 # put the object there. 

1008 # assert that the dataclass-enabled resolver agrees 

1009 # with what we are seeing 

1010 

1011 assert not attribute_is_overridden(name, obj) 

1012 

1013 if _is_declarative_props(obj): 

1014 obj = obj.fget() 

1015 

1016 collected_attributes[name] = obj 

1017 self._collect_annotation( 

1018 name, annotation, base, False, obj 

1019 ) 

1020 else: 

1021 collected_annotation = self._collect_annotation( 

1022 name, annotation, base, None, obj 

1023 ) 

1024 is_mapped = ( 

1025 collected_annotation is not None 

1026 and collected_annotation.mapped_container is not None 

1027 ) 

1028 generated_obj = ( 

1029 collected_annotation.attr_value 

1030 if collected_annotation is not None 

1031 else obj 

1032 ) 

1033 if obj is None and not fixed_table and is_mapped: 

1034 collected_attributes[name] = ( 

1035 generated_obj 

1036 if generated_obj is not None 

1037 else MappedColumn() 

1038 ) 

1039 elif name in clsdict_view: 

1040 collected_attributes[name] = obj 

1041 # else if the name is not in the cls.__dict__, 

1042 # don't collect it as an attribute. 

1043 # we will see the annotation only, which is meaningful 

1044 # both for mapping and dataclasses setup 

1045 

1046 if inherited_table_args and not tablename: 

1047 table_args = None 

1048 

1049 self.table_args = table_args 

1050 self.tablename = tablename 

1051 self.mapper_args_fn = mapper_args_fn 

1052 self.table_fn = table_fn 

1053 

1054 def _setup_dataclasses_transforms(self) -> None: 

1055 dataclass_setup_arguments = self.dataclass_setup_arguments 

1056 if not dataclass_setup_arguments: 

1057 return 

1058 

1059 # can't use is_dataclass since it uses hasattr 

1060 if "__dataclass_fields__" in self.cls.__dict__: 

1061 raise exc.InvalidRequestError( 

1062 f"Class {self.cls} is already a dataclass; ensure that " 

1063 "base classes / decorator styles of establishing dataclasses " 

1064 "are not being mixed. " 

1065 "This can happen if a class that inherits from " 

1066 "'MappedAsDataclass', even indirectly, is been mapped with " 

1067 "'@registry.mapped_as_dataclass'" 

1068 ) 

1069 

1070 warn_for_non_dc_attrs = collections.defaultdict(list) 

1071 

1072 def _allow_dataclass_field( 

1073 key: str, originating_class: Type[Any] 

1074 ) -> bool: 

1075 if ( 

1076 originating_class is not self.cls 

1077 and "__dataclass_fields__" not in originating_class.__dict__ 

1078 ): 

1079 warn_for_non_dc_attrs[originating_class].append(key) 

1080 

1081 return True 

1082 

1083 manager = instrumentation.manager_of_class(self.cls) 

1084 assert manager is not None 

1085 

1086 field_list = [ 

1087 _AttributeOptions._get_arguments_for_make_dataclass( 

1088 key, 

1089 anno, 

1090 mapped_container, 

1091 self.collected_attributes.get(key, _NoArg.NO_ARG), 

1092 ) 

1093 for key, anno, mapped_container in ( 

1094 ( 

1095 key, 

1096 mapped_anno if mapped_anno else raw_anno, 

1097 mapped_container, 

1098 ) 

1099 for key, ( 

1100 raw_anno, 

1101 mapped_container, 

1102 mapped_anno, 

1103 is_dc, 

1104 attr_value, 

1105 originating_module, 

1106 originating_class, 

1107 ) in self.collected_annotations.items() 

1108 if _allow_dataclass_field(key, originating_class) 

1109 and ( 

1110 key not in self.collected_attributes 

1111 # issue #9226; check for attributes that we've collected 

1112 # which are already instrumented, which we would assume 

1113 # mean we are in an ORM inheritance mapping and this 

1114 # attribute is already mapped on the superclass. Under 

1115 # no circumstance should any QueryableAttribute be sent to 

1116 # the dataclass() function; anything that's mapped should 

1117 # be Field and that's it 

1118 or not isinstance( 

1119 self.collected_attributes[key], QueryableAttribute 

1120 ) 

1121 ) 

1122 ) 

1123 ] 

1124 

1125 if warn_for_non_dc_attrs: 

1126 for ( 

1127 originating_class, 

1128 non_dc_attrs, 

1129 ) in warn_for_non_dc_attrs.items(): 

1130 util.warn_deprecated( 

1131 f"When transforming {self.cls} to a dataclass, " 

1132 f"attribute(s) " 

1133 f"{', '.join(repr(key) for key in non_dc_attrs)} " 

1134 f"originates from superclass " 

1135 f"{originating_class}, which is not a dataclass. This " 

1136 f"usage is deprecated and will raise an error in " 

1137 f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative " 

1138 f"Dataclasses, ensure that all mixin classes and other " 

1139 f"superclasses which include attributes are also a " 

1140 f"subclass of MappedAsDataclass.", 

1141 "2.0", 

1142 code="dcmx", 

1143 ) 

1144 

1145 annotations = {} 

1146 defaults = {} 

1147 for item in field_list: 

1148 if len(item) == 2: 

1149 name, tp = item 

1150 elif len(item) == 3: 

1151 name, tp, spec = item 

1152 defaults[name] = spec 

1153 else: 

1154 assert False 

1155 annotations[name] = tp 

1156 

1157 for k, v in defaults.items(): 

1158 setattr(self.cls, k, v) 

1159 

1160 self._apply_dataclasses_to_any_class( 

1161 dataclass_setup_arguments, self.cls, annotations 

1162 ) 

1163 

1164 @classmethod 

1165 def _update_annotations_for_non_mapped_class( 

1166 cls, klass: Type[_O] 

1167 ) -> Mapping[str, _AnnotationScanType]: 

1168 cls_annotations = util.get_annotations(klass) 

1169 

1170 new_anno = {} 

1171 for name, annotation in cls_annotations.items(): 

1172 if _is_mapped_annotation(annotation, klass, klass): 

1173 extracted = _extract_mapped_subtype( 

1174 annotation, 

1175 klass, 

1176 klass.__module__, 

1177 name, 

1178 type(None), 

1179 required=False, 

1180 is_dataclass_field=False, 

1181 expect_mapped=False, 

1182 ) 

1183 if extracted: 

1184 inner, _ = extracted 

1185 new_anno[name] = inner 

1186 else: 

1187 new_anno[name] = annotation 

1188 return new_anno 

1189 

1190 @classmethod 

1191 def _apply_dataclasses_to_any_class( 

1192 cls, 

1193 dataclass_setup_arguments: _DataclassArguments, 

1194 klass: Type[_O], 

1195 use_annotations: Mapping[str, _AnnotationScanType], 

1196 ) -> None: 

1197 cls._assert_dc_arguments(dataclass_setup_arguments) 

1198 

1199 dataclass_callable = dataclass_setup_arguments["dataclass_callable"] 

1200 if dataclass_callable is _NoArg.NO_ARG: 

1201 dataclass_callable = dataclasses.dataclass 

1202 

1203 restored: Optional[Any] 

1204 

1205 if use_annotations: 

1206 # apply constructed annotations that should look "normal" to a 

1207 # dataclasses callable, based on the fields present. This 

1208 # means remove the Mapped[] container and ensure all Field 

1209 # entries have an annotation 

1210 restored = getattr(klass, "__annotations__", None) 

1211 klass.__annotations__ = cast("Dict[str, Any]", use_annotations) 

1212 else: 

1213 restored = None 

1214 

1215 try: 

1216 dataclass_callable( 

1217 klass, 

1218 **{ 

1219 k: v 

1220 for k, v in dataclass_setup_arguments.items() 

1221 if v is not _NoArg.NO_ARG and k != "dataclass_callable" 

1222 }, 

1223 ) 

1224 except (TypeError, ValueError) as ex: 

1225 raise exc.InvalidRequestError( 

1226 f"Python dataclasses error encountered when creating " 

1227 f"dataclass for {klass.__name__!r}: " 

1228 f"{ex!r}. Please refer to Python dataclasses " 

1229 "documentation for additional information.", 

1230 code="dcte", 

1231 ) from ex 

1232 finally: 

1233 # restore original annotations outside of the dataclasses 

1234 # process; for mixins and __abstract__ superclasses, SQLAlchemy 

1235 # Declarative will need to see the Mapped[] container inside the 

1236 # annotations in order to map subclasses 

1237 if use_annotations: 

1238 if restored is None: 

1239 del klass.__annotations__ 

1240 else: 

1241 klass.__annotations__ = restored 

1242 

1243 @classmethod 

1244 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None: 

1245 allowed = { 

1246 "init", 

1247 "repr", 

1248 "order", 

1249 "eq", 

1250 "unsafe_hash", 

1251 "kw_only", 

1252 "match_args", 

1253 "dataclass_callable", 

1254 } 

1255 disallowed_args = set(arguments).difference(allowed) 

1256 if disallowed_args: 

1257 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args)) 

1258 raise exc.ArgumentError( 

1259 f"Dataclass argument(s) {msg} are not accepted" 

1260 ) 

1261 

1262 def _collect_annotation( 

1263 self, 

1264 name: str, 

1265 raw_annotation: _AnnotationScanType, 

1266 originating_class: Type[Any], 

1267 expect_mapped: Optional[bool], 

1268 attr_value: Any, 

1269 ) -> Optional[_CollectedAnnotation]: 

1270 if name in self.collected_annotations: 

1271 return self.collected_annotations[name] 

1272 

1273 if raw_annotation is None: 

1274 return None 

1275 

1276 is_dataclass = self.is_dataclass_prior_to_mapping 

1277 allow_unmapped = self.allow_unmapped_annotations 

1278 

1279 if expect_mapped is None: 

1280 is_dataclass_field = isinstance(attr_value, dataclasses.Field) 

1281 expect_mapped = ( 

1282 not is_dataclass_field 

1283 and not allow_unmapped 

1284 and ( 

1285 attr_value is None 

1286 or isinstance(attr_value, _MappedAttribute) 

1287 ) 

1288 ) 

1289 else: 

1290 is_dataclass_field = False 

1291 

1292 is_dataclass_field = False 

1293 extracted = _extract_mapped_subtype( 

1294 raw_annotation, 

1295 self.cls, 

1296 originating_class.__module__, 

1297 name, 

1298 type(attr_value), 

1299 required=False, 

1300 is_dataclass_field=is_dataclass_field, 

1301 expect_mapped=expect_mapped 

1302 and not is_dataclass, # self.allow_dataclass_fields, 

1303 ) 

1304 

1305 if extracted is None: 

1306 # ClassVar can come out here 

1307 return None 

1308 

1309 extracted_mapped_annotation, mapped_container = extracted 

1310 

1311 if attr_value is None and not is_literal(extracted_mapped_annotation): 

1312 for elem in typing_get_args(extracted_mapped_annotation): 

1313 if isinstance(elem, str) or is_fwd_ref( 

1314 elem, check_generic=True 

1315 ): 

1316 elem = de_stringify_annotation( 

1317 self.cls, 

1318 elem, 

1319 originating_class.__module__, 

1320 include_generic=True, 

1321 ) 

1322 # look in Annotated[...] for an ORM construct, 

1323 # such as Annotated[int, mapped_column(primary_key=True)] 

1324 if isinstance(elem, _IntrospectsAnnotations): 

1325 attr_value = elem.found_in_pep593_annotated() 

1326 

1327 self.collected_annotations[name] = ca = _CollectedAnnotation( 

1328 raw_annotation, 

1329 mapped_container, 

1330 extracted_mapped_annotation, 

1331 is_dataclass, 

1332 attr_value, 

1333 originating_class.__module__, 

1334 originating_class, 

1335 ) 

1336 return ca 

1337 

1338 def _warn_for_decl_attributes( 

1339 self, cls: Type[Any], key: str, c: Any 

1340 ) -> None: 

1341 if isinstance(c, expression.ColumnElement): 

1342 util.warn( 

1343 f"Attribute '{key}' on class {cls} appears to " 

1344 "be a non-schema SQLAlchemy expression " 

1345 "object; this won't be part of the declarative mapping. " 

1346 "To map arbitrary expressions, use ``column_property()`` " 

1347 "or a similar function such as ``deferred()``, " 

1348 "``query_expression()`` etc. " 

1349 ) 

1350 

1351 def _produce_column_copies( 

1352 self, 

1353 attributes_for_class: Callable[ 

1354 [], Iterable[Tuple[str, Any, Any, bool]] 

1355 ], 

1356 attribute_is_overridden: Callable[[str, Any], bool], 

1357 fixed_table: bool, 

1358 originating_class: Type[Any], 

1359 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]: 

1360 cls = self.cls 

1361 dict_ = self.clsdict_view 

1362 locally_collected_attributes = {} 

1363 column_copies = self.column_copies 

1364 # copy mixin columns to the mapped class 

1365 

1366 for name, obj, annotation, is_dataclass in attributes_for_class(): 

1367 if ( 

1368 not fixed_table 

1369 and obj is None 

1370 and _is_mapped_annotation(annotation, cls, originating_class) 

1371 ): 

1372 # obj is None means this is the annotation only path 

1373 

1374 if attribute_is_overridden(name, obj): 

1375 # perform same "overridden" check as we do for 

1376 # Column/MappedColumn, this is how a mixin col is not 

1377 # applied to an inherited subclass that does not have 

1378 # the mixin. the anno-only path added here for 

1379 # #9564 

1380 continue 

1381 

1382 collected_annotation = self._collect_annotation( 

1383 name, annotation, originating_class, True, obj 

1384 ) 

1385 obj = ( 

1386 collected_annotation.attr_value 

1387 if collected_annotation is not None 

1388 else obj 

1389 ) 

1390 if obj is None: 

1391 obj = MappedColumn() 

1392 

1393 locally_collected_attributes[name] = obj 

1394 setattr(cls, name, obj) 

1395 

1396 elif isinstance(obj, (Column, MappedColumn)): 

1397 if attribute_is_overridden(name, obj): 

1398 # if column has been overridden 

1399 # (like by the InstrumentedAttribute of the 

1400 # superclass), skip. don't collect the annotation 

1401 # either (issue #8718) 

1402 continue 

1403 

1404 collected_annotation = self._collect_annotation( 

1405 name, annotation, originating_class, True, obj 

1406 ) 

1407 obj = ( 

1408 collected_annotation.attr_value 

1409 if collected_annotation is not None 

1410 else obj 

1411 ) 

1412 

1413 if name not in dict_ and not ( 

1414 "__table__" in dict_ 

1415 and (getattr(obj, "name", None) or name) 

1416 in dict_["__table__"].c 

1417 ): 

1418 if obj.foreign_keys: 

1419 for fk in obj.foreign_keys: 

1420 if ( 

1421 fk._table_column is not None 

1422 and fk._table_column.table is None 

1423 ): 

1424 raise exc.InvalidRequestError( 

1425 "Columns with foreign keys to " 

1426 "non-table-bound " 

1427 "columns must be declared as " 

1428 "@declared_attr callables " 

1429 "on declarative mixin classes. " 

1430 "For dataclass " 

1431 "field() objects, use a lambda:." 

1432 ) 

1433 

1434 column_copies[obj] = copy_ = obj._copy() 

1435 

1436 locally_collected_attributes[name] = copy_ 

1437 setattr(cls, name, copy_) 

1438 

1439 return locally_collected_attributes 

1440 

1441 def _extract_mappable_attributes(self) -> None: 

1442 cls = self.cls 

1443 collected_attributes = self.collected_attributes 

1444 

1445 our_stuff = self.properties 

1446 

1447 _include_dunders = self._include_dunders 

1448 

1449 late_mapped = _get_immediate_cls_attr( 

1450 cls, "_sa_decl_prepare_nocascade", strict=True 

1451 ) 

1452 

1453 allow_unmapped_annotations = self.allow_unmapped_annotations 

1454 expect_annotations_wo_mapped = ( 

1455 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping 

1456 ) 

1457 

1458 look_for_dataclass_things = bool(self.dataclass_setup_arguments) 

1459 

1460 for k in list(collected_attributes): 

1461 if k in _include_dunders: 

1462 continue 

1463 

1464 value = collected_attributes[k] 

1465 

1466 if _is_declarative_props(value): 

1467 # @declared_attr in collected_attributes only occurs here for a 

1468 # @declared_attr that's directly on the mapped class; 

1469 # for a mixin, these have already been evaluated 

1470 if value._cascading: 

1471 util.warn( 

1472 "Use of @declared_attr.cascading only applies to " 

1473 "Declarative 'mixin' and 'abstract' classes. " 

1474 "Currently, this flag is ignored on mapped class " 

1475 "%s" % self.cls 

1476 ) 

1477 

1478 value = getattr(cls, k) 

1479 

1480 elif ( 

1481 isinstance(value, QueryableAttribute) 

1482 and value.class_ is not cls 

1483 and value.key != k 

1484 ): 

1485 # detect a QueryableAttribute that's already mapped being 

1486 # assigned elsewhere in userland, turn into a synonym() 

1487 value = SynonymProperty(value.key) 

1488 setattr(cls, k, value) 

1489 

1490 if ( 

1491 isinstance(value, tuple) 

1492 and len(value) == 1 

1493 and isinstance(value[0], (Column, _MappedAttribute)) 

1494 ): 

1495 util.warn( 

1496 "Ignoring declarative-like tuple value of attribute " 

1497 "'%s': possibly a copy-and-paste error with a comma " 

1498 "accidentally placed at the end of the line?" % k 

1499 ) 

1500 continue 

1501 elif look_for_dataclass_things and isinstance( 

1502 value, dataclasses.Field 

1503 ): 

1504 # we collected a dataclass Field; dataclasses would have 

1505 # set up the correct state on the class 

1506 continue 

1507 elif not isinstance(value, (Column, _DCAttributeOptions)): 

1508 # using @declared_attr for some object that 

1509 # isn't Column/MapperProperty/_DCAttributeOptions; remove 

1510 # from the clsdict_view 

1511 # and place the evaluated value onto the class. 

1512 collected_attributes.pop(k) 

1513 self._warn_for_decl_attributes(cls, k, value) 

1514 if not late_mapped: 

1515 setattr(cls, k, value) 

1516 continue 

1517 # we expect to see the name 'metadata' in some valid cases; 

1518 # however at this point we see it's assigned to something trying 

1519 # to be mapped, so raise for that. 

1520 # TODO: should "registry" here be also? might be too late 

1521 # to change that now (2.0 betas) 

1522 elif k in ("metadata",): 

1523 raise exc.InvalidRequestError( 

1524 f"Attribute name '{k}' is reserved when using the " 

1525 "Declarative API." 

1526 ) 

1527 elif isinstance(value, Column): 

1528 _undefer_column_name( 

1529 k, self.column_copies.get(value, value) # type: ignore 

1530 ) 

1531 else: 

1532 if isinstance(value, _IntrospectsAnnotations): 

1533 ( 

1534 annotation, 

1535 mapped_container, 

1536 extracted_mapped_annotation, 

1537 is_dataclass, 

1538 attr_value, 

1539 originating_module, 

1540 originating_class, 

1541 ) = self.collected_annotations.get( 

1542 k, (None, None, None, False, None, None, None) 

1543 ) 

1544 

1545 # issue #8692 - don't do any annotation interpretation if 

1546 # an annotation were present and a container such as 

1547 # Mapped[] etc. were not used. If annotation is None, 

1548 # do declarative_scan so that the property can raise 

1549 # for required 

1550 if ( 

1551 mapped_container is not None 

1552 or annotation is None 

1553 # issue #10516: need to do declarative_scan even with 

1554 # a non-Mapped annotation if we are doing 

1555 # __allow_unmapped__, for things like col.name 

1556 # assignment 

1557 or allow_unmapped_annotations 

1558 ): 

1559 try: 

1560 value.declarative_scan( 

1561 self, 

1562 self.registry, 

1563 cls, 

1564 originating_module, 

1565 k, 

1566 mapped_container, 

1567 annotation, 

1568 extracted_mapped_annotation, 

1569 is_dataclass, 

1570 ) 

1571 except NameError as ne: 

1572 raise exc.ArgumentError( 

1573 f"Could not resolve all types within mapped " 

1574 f'annotation: "{annotation}". Ensure all ' 

1575 f"types are written correctly and are " 

1576 f"imported within the module in use." 

1577 ) from ne 

1578 else: 

1579 # assert that we were expecting annotations 

1580 # without Mapped[] were going to be passed. 

1581 # otherwise an error should have been raised 

1582 # by util._extract_mapped_subtype before we got here. 

1583 assert expect_annotations_wo_mapped 

1584 

1585 if isinstance(value, _DCAttributeOptions): 

1586 if ( 

1587 value._has_dataclass_arguments 

1588 and not look_for_dataclass_things 

1589 ): 

1590 if isinstance(value, MapperProperty): 

1591 argnames = [ 

1592 "init", 

1593 "default_factory", 

1594 "repr", 

1595 "default", 

1596 ] 

1597 else: 

1598 argnames = ["init", "default_factory", "repr"] 

1599 

1600 args = { 

1601 a 

1602 for a in argnames 

1603 if getattr( 

1604 value._attribute_options, f"dataclasses_{a}" 

1605 ) 

1606 is not _NoArg.NO_ARG 

1607 } 

1608 

1609 raise exc.ArgumentError( 

1610 f"Attribute '{k}' on class {cls} includes " 

1611 f"dataclasses argument(s): " 

1612 f"{', '.join(sorted(repr(a) for a in args))} but " 

1613 f"class does not specify " 

1614 "SQLAlchemy native dataclass configuration." 

1615 ) 

1616 

1617 if not isinstance(value, (MapperProperty, _MapsColumns)): 

1618 # filter for _DCAttributeOptions objects that aren't 

1619 # MapperProperty / mapped_column(). Currently this 

1620 # includes AssociationProxy. pop it from the things 

1621 # we're going to map and set it up as a descriptor 

1622 # on the class. 

1623 collected_attributes.pop(k) 

1624 

1625 # Assoc Prox (or other descriptor object that may 

1626 # use _DCAttributeOptions) is usually here, except if 

1627 # 1. we're a 

1628 # dataclass, dataclasses would have removed the 

1629 # attr here or 2. assoc proxy is coming from a 

1630 # superclass, we want it to be direct here so it 

1631 # tracks state or 3. assoc prox comes from 

1632 # declared_attr, uncommon case 

1633 setattr(cls, k, value) 

1634 continue 

1635 

1636 our_stuff[k] = value 

1637 

1638 def _extract_declared_columns(self) -> None: 

1639 our_stuff = self.properties 

1640 

1641 # extract columns from the class dict 

1642 declared_columns = self.declared_columns 

1643 column_ordering = self.column_ordering 

1644 name_to_prop_key = collections.defaultdict(set) 

1645 

1646 for key, c in list(our_stuff.items()): 

1647 if isinstance(c, _MapsColumns): 

1648 mp_to_assign = c.mapper_property_to_assign 

1649 if mp_to_assign: 

1650 our_stuff[key] = mp_to_assign 

1651 else: 

1652 # if no mapper property to assign, this currently means 

1653 # this is a MappedColumn that will produce a Column for us 

1654 del our_stuff[key] 

1655 

1656 for col, sort_order in c.columns_to_assign: 

1657 if not isinstance(c, CompositeProperty): 

1658 name_to_prop_key[col.name].add(key) 

1659 declared_columns.add(col) 

1660 

1661 # we would assert this, however we want the below 

1662 # warning to take effect instead. See #9630 

1663 # assert col not in column_ordering 

1664 

1665 column_ordering[col] = sort_order 

1666 

1667 # if this is a MappedColumn and the attribute key we 

1668 # have is not what the column has for its key, map the 

1669 # Column explicitly under the attribute key name. 

1670 # otherwise, Mapper will map it under the column key. 

1671 if mp_to_assign is None and key != col.key: 

1672 our_stuff[key] = col 

1673 elif isinstance(c, Column): 

1674 # undefer previously occurred here, and now occurs earlier. 

1675 # ensure every column we get here has been named 

1676 assert c.name is not None 

1677 name_to_prop_key[c.name].add(key) 

1678 declared_columns.add(c) 

1679 # if the column is the same name as the key, 

1680 # remove it from the explicit properties dict. 

1681 # the normal rules for assigning column-based properties 

1682 # will take over, including precedence of columns 

1683 # in multi-column ColumnProperties. 

1684 if key == c.key: 

1685 del our_stuff[key] 

1686 

1687 for name, keys in name_to_prop_key.items(): 

1688 if len(keys) > 1: 

1689 util.warn( 

1690 "On class %r, Column object %r named " 

1691 "directly multiple times, " 

1692 "only one will be used: %s. " 

1693 "Consider using orm.synonym instead" 

1694 % (self.classname, name, (", ".join(sorted(keys)))) 

1695 ) 

1696 

1697 def _setup_table(self, table: Optional[FromClause] = None) -> None: 

1698 cls = self.cls 

1699 cls_as_Decl = cast("MappedClassProtocol[Any]", cls) 

1700 

1701 tablename = self.tablename 

1702 table_args = self.table_args 

1703 clsdict_view = self.clsdict_view 

1704 declared_columns = self.declared_columns 

1705 column_ordering = self.column_ordering 

1706 

1707 manager = attributes.manager_of_class(cls) 

1708 

1709 if ( 

1710 self.table_fn is None 

1711 and "__table__" not in clsdict_view 

1712 and table is None 

1713 ): 

1714 if hasattr(cls, "__table_cls__"): 

1715 table_cls = cast( 

1716 Type[Table], 

1717 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501 

1718 ) 

1719 else: 

1720 table_cls = Table 

1721 

1722 if tablename is not None: 

1723 args: Tuple[Any, ...] = () 

1724 table_kw: Dict[str, Any] = {} 

1725 

1726 if table_args: 

1727 if isinstance(table_args, dict): 

1728 table_kw = table_args 

1729 elif isinstance(table_args, tuple): 

1730 if isinstance(table_args[-1], dict): 

1731 args, table_kw = table_args[0:-1], table_args[-1] 

1732 else: 

1733 args = table_args 

1734 

1735 autoload_with = clsdict_view.get("__autoload_with__") 

1736 if autoload_with: 

1737 table_kw["autoload_with"] = autoload_with 

1738 

1739 autoload = clsdict_view.get("__autoload__") 

1740 if autoload: 

1741 table_kw["autoload"] = True 

1742 

1743 sorted_columns = sorted( 

1744 declared_columns, 

1745 key=lambda c: column_ordering.get(c, 0), 

1746 ) 

1747 table = self.set_cls_attribute( 

1748 "__table__", 

1749 table_cls( 

1750 tablename, 

1751 self._metadata_for_cls(manager), 

1752 *sorted_columns, 

1753 *args, 

1754 **table_kw, 

1755 ), 

1756 ) 

1757 else: 

1758 if table is None: 

1759 if self.table_fn: 

1760 table = self.set_cls_attribute( 

1761 "__table__", self.table_fn() 

1762 ) 

1763 else: 

1764 table = cls_as_Decl.__table__ 

1765 if declared_columns: 

1766 for c in declared_columns: 

1767 if not table.c.contains_column(c): 

1768 raise exc.ArgumentError( 

1769 "Can't add additional column %r when " 

1770 "specifying __table__" % c.key 

1771 ) 

1772 

1773 self.local_table = table 

1774 

1775 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData: 

1776 meta: Optional[MetaData] = getattr(self.cls, "metadata", None) 

1777 if meta is not None: 

1778 return meta 

1779 else: 

1780 return manager.registry.metadata 

1781 

1782 def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None: 

1783 cls = self.cls 

1784 

1785 inherits = mapper_kw.get("inherits", None) 

1786 

1787 if inherits is None: 

1788 # since we search for classical mappings now, search for 

1789 # multiple mapped bases as well and raise an error. 

1790 inherits_search = [] 

1791 for base_ in cls.__bases__: 

1792 c = _resolve_for_abstract_or_classical(base_) 

1793 if c is None: 

1794 continue 

1795 

1796 if _is_supercls_for_inherits(c) and c not in inherits_search: 

1797 inherits_search.append(c) 

1798 

1799 if inherits_search: 

1800 if len(inherits_search) > 1: 

1801 raise exc.InvalidRequestError( 

1802 "Class %s has multiple mapped bases: %r" 

1803 % (cls, inherits_search) 

1804 ) 

1805 inherits = inherits_search[0] 

1806 elif isinstance(inherits, Mapper): 

1807 inherits = inherits.class_ 

1808 

1809 self.inherits = inherits 

1810 

1811 clsdict_view = self.clsdict_view 

1812 if "__table__" not in clsdict_view and self.tablename is None: 

1813 self.single = True 

1814 

1815 def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None: 

1816 table = self.local_table 

1817 cls = self.cls 

1818 table_args = self.table_args 

1819 declared_columns = self.declared_columns 

1820 

1821 if ( 

1822 table is None 

1823 and self.inherits is None 

1824 and not _get_immediate_cls_attr(cls, "__no_table__") 

1825 ): 

1826 raise exc.InvalidRequestError( 

1827 "Class %r does not have a __table__ or __tablename__ " 

1828 "specified and does not inherit from an existing " 

1829 "table-mapped class." % cls 

1830 ) 

1831 elif self.inherits: 

1832 inherited_mapper_or_config = _declared_mapping_info(self.inherits) 

1833 assert inherited_mapper_or_config is not None 

1834 inherited_table = inherited_mapper_or_config.local_table 

1835 inherited_persist_selectable = ( 

1836 inherited_mapper_or_config.persist_selectable 

1837 ) 

1838 

1839 if table is None: 

1840 # single table inheritance. 

1841 # ensure no table args 

1842 if table_args: 

1843 raise exc.ArgumentError( 

1844 "Can't place __table_args__ on an inherited class " 

1845 "with no table." 

1846 ) 

1847 

1848 # add any columns declared here to the inherited table. 

1849 if declared_columns and not isinstance(inherited_table, Table): 

1850 raise exc.ArgumentError( 

1851 f"Can't declare columns on single-table-inherited " 

1852 f"subclass {self.cls}; superclass {self.inherits} " 

1853 "is not mapped to a Table" 

1854 ) 

1855 

1856 for col in declared_columns: 

1857 assert inherited_table is not None 

1858 if col.name in inherited_table.c: 

1859 if inherited_table.c[col.name] is col: 

1860 continue 

1861 raise exc.ArgumentError( 

1862 f"Column '{col}' on class {cls.__name__} " 

1863 f"conflicts with existing column " 

1864 f"'{inherited_table.c[col.name]}'. If using " 

1865 f"Declarative, consider using the " 

1866 "use_existing_column parameter of mapped_column() " 

1867 "to resolve conflicts." 

1868 ) 

1869 if col.primary_key: 

1870 raise exc.ArgumentError( 

1871 "Can't place primary key columns on an inherited " 

1872 "class with no table." 

1873 ) 

1874 

1875 if TYPE_CHECKING: 

1876 assert isinstance(inherited_table, Table) 

1877 

1878 inherited_table.append_column(col) 

1879 if ( 

1880 inherited_persist_selectable is not None 

1881 and inherited_persist_selectable is not inherited_table 

1882 ): 

1883 inherited_persist_selectable._refresh_for_new_column( 

1884 col 

1885 ) 

1886 

1887 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None: 

1888 properties = self.properties 

1889 

1890 if self.mapper_args_fn: 

1891 mapper_args = self.mapper_args_fn() 

1892 else: 

1893 mapper_args = {} 

1894 

1895 if mapper_kw: 

1896 mapper_args.update(mapper_kw) 

1897 

1898 if "properties" in mapper_args: 

1899 properties = dict(properties) 

1900 properties.update(mapper_args["properties"]) 

1901 

1902 # make sure that column copies are used rather 

1903 # than the original columns from any mixins 

1904 for k in ("version_id_col", "polymorphic_on"): 

1905 if k in mapper_args: 

1906 v = mapper_args[k] 

1907 mapper_args[k] = self.column_copies.get(v, v) 

1908 

1909 if "primary_key" in mapper_args: 

1910 mapper_args["primary_key"] = [ 

1911 self.column_copies.get(v, v) 

1912 for v in util.to_list(mapper_args["primary_key"]) 

1913 ] 

1914 

1915 if "inherits" in mapper_args: 

1916 inherits_arg = mapper_args["inherits"] 

1917 if isinstance(inherits_arg, Mapper): 

1918 inherits_arg = inherits_arg.class_ 

1919 

1920 if inherits_arg is not self.inherits: 

1921 raise exc.InvalidRequestError( 

1922 "mapper inherits argument given for non-inheriting " 

1923 "class %s" % (mapper_args["inherits"]) 

1924 ) 

1925 

1926 if self.inherits: 

1927 mapper_args["inherits"] = self.inherits 

1928 

1929 if self.inherits and not mapper_args.get("concrete", False): 

1930 # note the superclass is expected to have a Mapper assigned and 

1931 # not be a deferred config, as this is called within map() 

1932 inherited_mapper = class_mapper(self.inherits, False) 

1933 inherited_table = inherited_mapper.local_table 

1934 

1935 # single or joined inheritance 

1936 # exclude any cols on the inherited table which are 

1937 # not mapped on the parent class, to avoid 

1938 # mapping columns specific to sibling/nephew classes 

1939 if "exclude_properties" not in mapper_args: 

1940 mapper_args["exclude_properties"] = exclude_properties = { 

1941 c.key 

1942 for c in inherited_table.c 

1943 if c not in inherited_mapper._columntoproperty 

1944 }.union(inherited_mapper.exclude_properties or ()) 

1945 exclude_properties.difference_update( 

1946 [c.key for c in self.declared_columns] 

1947 ) 

1948 

1949 # look through columns in the current mapper that 

1950 # are keyed to a propname different than the colname 

1951 # (if names were the same, we'd have popped it out above, 

1952 # in which case the mapper makes this combination). 

1953 # See if the superclass has a similar column property. 

1954 # If so, join them together. 

1955 for k, col in list(properties.items()): 

1956 if not isinstance(col, expression.ColumnElement): 

1957 continue 

1958 if k in inherited_mapper._props: 

1959 p = inherited_mapper._props[k] 

1960 if isinstance(p, ColumnProperty): 

1961 # note here we place the subclass column 

1962 # first. See [ticket:1892] for background. 

1963 properties[k] = [col] + p.columns 

1964 result_mapper_args = mapper_args.copy() 

1965 result_mapper_args["properties"] = properties 

1966 self.mapper_args = result_mapper_args 

1967 

1968 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

1969 self._prepare_mapper_arguments(mapper_kw) 

1970 if hasattr(self.cls, "__mapper_cls__"): 

1971 mapper_cls = cast( 

1972 "Type[Mapper[Any]]", 

1973 util.unbound_method_to_callable( 

1974 self.cls.__mapper_cls__ # type: ignore 

1975 ), 

1976 ) 

1977 else: 

1978 mapper_cls = Mapper 

1979 

1980 return self.set_cls_attribute( 

1981 "__mapper__", 

1982 mapper_cls(self.cls, self.local_table, **self.mapper_args), 

1983 ) 

1984 

1985 

1986@util.preload_module("sqlalchemy.orm.decl_api") 

1987def _as_dc_declaredattr( 

1988 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str 

1989) -> Any: 

1990 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr. 

1991 # we can't write it because field.metadata is immutable :( so we have 

1992 # to go through extra trouble to compare these 

1993 decl_api = util.preloaded.orm_decl_api 

1994 obj = field_metadata[sa_dataclass_metadata_key] 

1995 if callable(obj) and not isinstance(obj, decl_api.declared_attr): 

1996 return decl_api.declared_attr(obj) 

1997 else: 

1998 return obj 

1999 

2000 

2001class _DeferredMapperConfig(_ClassScanMapperConfig): 

2002 _cls: weakref.ref[Type[Any]] 

2003 

2004 is_deferred = True 

2005 

2006 _configs: util.OrderedDict[ 

2007 weakref.ref[Type[Any]], _DeferredMapperConfig 

2008 ] = util.OrderedDict() 

2009 

2010 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None: 

2011 pass 

2012 

2013 # mypy disallows plain property override of variable 

2014 @property # type: ignore 

2015 def cls(self) -> Type[Any]: 

2016 return self._cls() # type: ignore 

2017 

2018 @cls.setter 

2019 def cls(self, class_: Type[Any]) -> None: 

2020 self._cls = weakref.ref(class_, self._remove_config_cls) 

2021 self._configs[self._cls] = self 

2022 

2023 @classmethod 

2024 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None: 

2025 cls._configs.pop(ref, None) 

2026 

2027 @classmethod 

2028 def has_cls(cls, class_: Type[Any]) -> bool: 

2029 # 2.6 fails on weakref if class_ is an old style class 

2030 return isinstance(class_, type) and weakref.ref(class_) in cls._configs 

2031 

2032 @classmethod 

2033 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn: 

2034 if hasattr(class_, "_sa_raise_deferred_config"): 

2035 class_._sa_raise_deferred_config() 

2036 

2037 raise orm_exc.UnmappedClassError( 

2038 class_, 

2039 msg=( 

2040 f"Class {orm_exc._safe_cls_name(class_)} has a deferred " 

2041 "mapping on it. It is not yet usable as a mapped class." 

2042 ), 

2043 ) 

2044 

2045 @classmethod 

2046 def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig: 

2047 return cls._configs[weakref.ref(class_)] 

2048 

2049 @classmethod 

2050 def classes_for_base( 

2051 cls, base_cls: Type[Any], sort: bool = True 

2052 ) -> List[_DeferredMapperConfig]: 

2053 classes_for_base = [ 

2054 m 

2055 for m, cls_ in [(m, m.cls) for m in cls._configs.values()] 

2056 if cls_ is not None and issubclass(cls_, base_cls) 

2057 ] 

2058 

2059 if not sort: 

2060 return classes_for_base 

2061 

2062 all_m_by_cls = {m.cls: m for m in classes_for_base} 

2063 

2064 tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = [] 

2065 for m_cls in all_m_by_cls: 

2066 tuples.extend( 

2067 (all_m_by_cls[base_cls], all_m_by_cls[m_cls]) 

2068 for base_cls in m_cls.__bases__ 

2069 if base_cls in all_m_by_cls 

2070 ) 

2071 return list(topological.sort(tuples, classes_for_base)) 

2072 

2073 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

2074 self._configs.pop(self._cls, None) 

2075 return super().map(mapper_kw) 

2076 

2077 

2078def _add_attribute( 

2079 cls: Type[Any], key: str, value: MapperProperty[Any] 

2080) -> None: 

2081 """add an attribute to an existing declarative class. 

2082 

2083 This runs through the logic to determine MapperProperty, 

2084 adds it to the Mapper, adds a column to the mapped Table, etc. 

2085 

2086 """ 

2087 

2088 if "__mapper__" in cls.__dict__: 

2089 mapped_cls = cast("MappedClassProtocol[Any]", cls) 

2090 

2091 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table: 

2092 if isinstance(mc.__table__, Table): 

2093 return mc.__table__ 

2094 raise exc.InvalidRequestError( 

2095 f"Cannot add a new attribute to mapped class {mc.__name__!r} " 

2096 "because it's not mapped against a table." 

2097 ) 

2098 

2099 if isinstance(value, Column): 

2100 _undefer_column_name(key, value) 

2101 _table_or_raise(mapped_cls).append_column( 

2102 value, replace_existing=True 

2103 ) 

2104 mapped_cls.__mapper__.add_property(key, value) 

2105 elif isinstance(value, _MapsColumns): 

2106 mp = value.mapper_property_to_assign 

2107 for col, _ in value.columns_to_assign: 

2108 _undefer_column_name(key, col) 

2109 _table_or_raise(mapped_cls).append_column( 

2110 col, replace_existing=True 

2111 ) 

2112 if not mp: 

2113 mapped_cls.__mapper__.add_property(key, col) 

2114 if mp: 

2115 mapped_cls.__mapper__.add_property(key, mp) 

2116 elif isinstance(value, MapperProperty): 

2117 mapped_cls.__mapper__.add_property(key, value) 

2118 elif isinstance(value, QueryableAttribute) and value.key != key: 

2119 # detect a QueryableAttribute that's already mapped being 

2120 # assigned elsewhere in userland, turn into a synonym() 

2121 value = SynonymProperty(value.key) 

2122 mapped_cls.__mapper__.add_property(key, value) 

2123 else: 

2124 type.__setattr__(cls, key, value) 

2125 mapped_cls.__mapper__._expire_memoizations() 

2126 else: 

2127 type.__setattr__(cls, key, value) 

2128 

2129 

2130def _del_attribute(cls: Type[Any], key: str) -> None: 

2131 if ( 

2132 "__mapper__" in cls.__dict__ 

2133 and key in cls.__dict__ 

2134 and not cast( 

2135 "MappedClassProtocol[Any]", cls 

2136 ).__mapper__._dispose_called 

2137 ): 

2138 value = cls.__dict__[key] 

2139 if isinstance( 

2140 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute) 

2141 ): 

2142 raise NotImplementedError( 

2143 "Can't un-map individual mapped attributes on a mapped class." 

2144 ) 

2145 else: 

2146 type.__delattr__(cls, key) 

2147 cast( 

2148 "MappedClassProtocol[Any]", cls 

2149 ).__mapper__._expire_memoizations() 

2150 else: 

2151 type.__delattr__(cls, key) 

2152 

2153 

2154def _declarative_constructor(self: Any, **kwargs: Any) -> None: 

2155 """A simple constructor that allows initialization from kwargs. 

2156 

2157 Sets attributes on the constructed instance using the names and 

2158 values in ``kwargs``. 

2159 

2160 Only keys that are present as 

2161 attributes of the instance's class are allowed. These could be, 

2162 for example, any mapped columns or relationships. 

2163 """ 

2164 cls_ = type(self) 

2165 for k in kwargs: 

2166 if not hasattr(cls_, k): 

2167 raise TypeError( 

2168 "%r is an invalid keyword argument for %s" % (k, cls_.__name__) 

2169 ) 

2170 setattr(self, k, kwargs[k]) 

2171 

2172 

2173_declarative_constructor.__name__ = "__init__" 

2174 

2175 

2176def _undefer_column_name(key: str, column: Column[Any]) -> None: 

2177 if column.key is None: 

2178 column.key = key 

2179 if column.name is None: 

2180 column.name = key