Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

914 statements  

1# orm/decl_base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Internal implementation for declarative.""" 

9 

10from __future__ import annotations 

11 

12import collections 

13import dataclasses 

14import itertools 

15import re 

16from typing import Any 

17from typing import Callable 

18from typing import cast 

19from typing import Dict 

20from typing import get_args 

21from typing import Iterable 

22from typing import List 

23from typing import Mapping 

24from typing import NamedTuple 

25from typing import NoReturn 

26from typing import Optional 

27from typing import Protocol 

28from typing import Sequence 

29from typing import Tuple 

30from typing import Type 

31from typing import TYPE_CHECKING 

32from typing import TypeVar 

33from typing import Union 

34import weakref 

35 

36from . import attributes 

37from . import clsregistry 

38from . import exc as orm_exc 

39from . import instrumentation 

40from . import mapperlib 

41from ._typing import _O 

42from ._typing import attr_is_internal_proxy 

43from .attributes import InstrumentedAttribute 

44from .attributes import QueryableAttribute 

45from .base import _is_mapped_class 

46from .base import InspectionAttr 

47from .descriptor_props import CompositeProperty 

48from .descriptor_props import SynonymProperty 

49from .interfaces import _AttributeOptions 

50from .interfaces import _DataclassArguments 

51from .interfaces import _DCAttributeOptions 

52from .interfaces import _IntrospectsAnnotations 

53from .interfaces import _MappedAttribute 

54from .interfaces import _MapsColumns 

55from .interfaces import MapperProperty 

56from .mapper import Mapper 

57from .properties import ColumnProperty 

58from .properties import MappedColumn 

59from .util import _extract_mapped_subtype 

60from .util import _is_mapped_annotation 

61from .util import class_mapper 

62from .util import de_stringify_annotation 

63from .. import event 

64from .. import exc 

65from .. import util 

66from ..sql import expression 

67from ..sql.base import _NoArg 

68from ..sql.schema import Column 

69from ..sql.schema import Table 

70from ..util import topological 

71from ..util.typing import _AnnotationScanType 

72from ..util.typing import is_fwd_ref 

73from ..util.typing import is_literal 

74 

75if TYPE_CHECKING: 

76 from ._typing import _ClassDict 

77 from ._typing import _RegistryType 

78 from .base import Mapped 

79 from .decl_api import declared_attr 

80 from .instrumentation import ClassManager 

81 from ..sql.elements import NamedColumn 

82 from ..sql.schema import MetaData 

83 from ..sql.selectable import FromClause 

84 

85_T = TypeVar("_T", bound=Any) 

86 

87_MapperKwArgs = Mapping[str, Any] 

88_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]] 

89 

90 

91class MappedClassProtocol(Protocol[_O]): 

92 """A protocol representing a SQLAlchemy mapped class. 

93 

94 The protocol is generic on the type of class, use 

95 ``MappedClassProtocol[Any]`` to allow any mapped class. 

96 """ 

97 

98 __name__: str 

99 __mapper__: Mapper[_O] 

100 __table__: FromClause 

101 

102 def __call__(self, **kw: Any) -> _O: ... 

103 

104 

105class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol): 

106 "Internal more detailed version of ``MappedClassProtocol``." 

107 

108 metadata: MetaData 

109 __tablename__: str 

110 __mapper_args__: _MapperKwArgs 

111 __table_args__: Optional[_TableArgsType] 

112 

113 _sa_apply_dc_transforms: Optional[_DataclassArguments] 

114 

115 def __declare_first__(self) -> None: ... 

116 

117 def __declare_last__(self) -> None: ... 

118 

119 

120def _declared_mapping_info( 

121 cls: Type[Any], 

122) -> Optional[Union[_DeferredDeclarativeConfig, Mapper[Any]]]: 

123 # deferred mapping 

124 if _DeferredDeclarativeConfig.has_cls(cls): 

125 return _DeferredDeclarativeConfig.config_for_cls(cls) 

126 # regular mapping 

127 elif _is_mapped_class(cls): 

128 return class_mapper(cls, configure=False) 

129 else: 

130 return None 

131 

132 

133def _is_supercls_for_inherits(cls: Type[Any]) -> bool: 

134 """return True if this class will be used as a superclass to set in 

135 'inherits'. 

136 

137 This includes deferred mapper configs that aren't mapped yet, however does 

138 not include classes with _sa_decl_prepare_nocascade (e.g. 

139 ``AbstractConcreteBase``); these concrete-only classes are not set up as 

140 "inherits" until after mappers are configured using 

141 mapper._set_concrete_base() 

142 

143 """ 

144 if _DeferredDeclarativeConfig.has_cls(cls): 

145 return not _get_immediate_cls_attr( 

146 cls, "_sa_decl_prepare_nocascade", strict=True 

147 ) 

148 # regular mapping 

149 elif _is_mapped_class(cls): 

150 return True 

151 else: 

152 return False 

153 

154 

155def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]: 

156 if cls is object: 

157 return None 

158 

159 sup: Optional[Type[Any]] 

160 

161 if cls.__dict__.get("__abstract__", False): 

162 for base_ in cls.__bases__: 

163 sup = _resolve_for_abstract_or_classical(base_) 

164 if sup is not None: 

165 return sup 

166 else: 

167 return None 

168 else: 

169 clsmanager = _dive_for_cls_manager(cls) 

170 

171 if clsmanager: 

172 return clsmanager.class_ 

173 else: 

174 return cls 

175 

176 

177def _get_immediate_cls_attr( 

178 cls: Type[Any], attrname: str, strict: bool = False 

179) -> Optional[Any]: 

180 """return an attribute of the class that is either present directly 

181 on the class, e.g. not on a superclass, or is from a superclass but 

182 this superclass is a non-mapped mixin, that is, not a descendant of 

183 the declarative base and is also not classically mapped. 

184 

185 This is used to detect attributes that indicate something about 

186 a mapped class independently from any mapped classes that it may 

187 inherit from. 

188 

189 """ 

190 

191 # the rules are different for this name than others, 

192 # make sure we've moved it out. transitional 

193 assert attrname != "__abstract__" 

194 

195 if not issubclass(cls, object): 

196 return None 

197 

198 if attrname in cls.__dict__: 

199 return getattr(cls, attrname) 

200 

201 for base in cls.__mro__[1:]: 

202 _is_classical_inherits = _dive_for_cls_manager(base) is not None 

203 

204 if attrname in base.__dict__ and ( 

205 base is cls 

206 or ( 

207 (base in cls.__bases__ if strict else True) 

208 and not _is_classical_inherits 

209 ) 

210 ): 

211 return getattr(base, attrname) 

212 else: 

213 return None 

214 

215 

216def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]: 

217 # because the class manager registration is pluggable, 

218 # we need to do the search for every class in the hierarchy, 

219 # rather than just a simple "cls._sa_class_manager" 

220 

221 for base in cls.__mro__: 

222 manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class( 

223 base 

224 ) 

225 if manager: 

226 return manager 

227 return None 

228 

229 

230@util.preload_module("sqlalchemy.orm.decl_api") 

231def _is_declarative_props(obj: Any) -> bool: 

232 _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common 

233 

234 return isinstance(obj, (_declared_attr_common, util.classproperty)) 

235 

236 

237def _check_declared_props_nocascade( 

238 obj: Any, name: str, cls: Type[_O] 

239) -> bool: 

240 if _is_declarative_props(obj): 

241 if getattr(obj, "_cascading", False): 

242 util.warn( 

243 "@declared_attr.cascading is not supported on the %s " 

244 "attribute on class %s. This attribute invokes for " 

245 "subclasses in any case." % (name, cls) 

246 ) 

247 return True 

248 else: 

249 return False 

250 

251 

252class _ORMClassConfigurator: 

253 """Object that configures a class that's potentially going to be 

254 mapped, and/or turned into an ORM dataclass. 

255 

256 This is the base class for all the configurator objects. 

257 

258 """ 

259 

260 __slots__ = ("cls", "classname", "__weakref__") 

261 

262 cls: Type[Any] 

263 classname: str 

264 

265 def __init__(self, cls_: Type[Any]): 

266 self.cls = util.assert_arg_type(cls_, type, "cls_") 

267 self.classname = cls_.__name__ 

268 

269 @classmethod 

270 def _as_declarative( 

271 cls, registry: _RegistryType, cls_: Type[Any], dict_: _ClassDict 

272 ) -> Optional[_MapperConfig]: 

273 manager = attributes.opt_manager_of_class(cls_) 

274 if manager and manager.class_ is cls_: 

275 raise exc.InvalidRequestError( 

276 f"Class {cls_!r} already has been instrumented declaratively" 

277 ) 

278 

279 if cls_.__dict__.get("__abstract__", False): 

280 return None 

281 

282 defer_map = _get_immediate_cls_attr( 

283 cls_, "_sa_decl_prepare_nocascade", strict=True 

284 ) or hasattr(cls_, "_sa_decl_prepare") 

285 

286 if defer_map: 

287 return _DeferredDeclarativeConfig(registry, cls_, dict_) 

288 else: 

289 return _DeclarativeMapperConfig(registry, cls_, dict_) 

290 

291 @classmethod 

292 def _as_unmapped_dataclass( 

293 cls, cls_: Type[Any], dict_: _ClassDict 

294 ) -> _UnmappedDataclassConfig: 

295 return _UnmappedDataclassConfig(cls_, dict_) 

296 

297 @classmethod 

298 def _mapper( 

299 cls, 

300 registry: _RegistryType, 

301 cls_: Type[_O], 

302 table: Optional[FromClause], 

303 mapper_kw: _MapperKwArgs, 

304 ) -> Mapper[_O]: 

305 _ImperativeMapperConfig(registry, cls_, table, mapper_kw) 

306 return cast("MappedClassProtocol[_O]", cls_).__mapper__ 

307 

308 

309class _MapperConfig(_ORMClassConfigurator): 

310 """Configurator that configures a class that's potentially going to be 

311 mapped, and optionally turned into a dataclass as well.""" 

312 

313 __slots__ = ( 

314 "properties", 

315 "declared_attr_reg", 

316 ) 

317 

318 properties: util.OrderedDict[ 

319 str, 

320 Union[ 

321 Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any] 

322 ], 

323 ] 

324 declared_attr_reg: Dict[declared_attr[Any], Any] 

325 

326 def __init__( 

327 self, 

328 registry: _RegistryType, 

329 cls_: Type[Any], 

330 ): 

331 super().__init__(cls_) 

332 self.properties = util.OrderedDict() 

333 self.declared_attr_reg = {} 

334 

335 instrumentation.register_class( 

336 self.cls, 

337 finalize=False, 

338 registry=registry, 

339 declarative_scan=self, 

340 init_method=registry.constructor, 

341 ) 

342 

343 def set_cls_attribute(self, attrname: str, value: _T) -> _T: 

344 manager = instrumentation.manager_of_class(self.cls) 

345 manager.install_member(attrname, value) 

346 return value 

347 

348 def map(self, mapper_kw: _MapperKwArgs) -> Mapper[Any]: 

349 raise NotImplementedError() 

350 

351 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None: 

352 self.map(mapper_kw) 

353 

354 

355class _ImperativeMapperConfig(_MapperConfig): 

356 """Configurator that configures a class for an imperative mapping.""" 

357 

358 __slots__ = ("local_table", "inherits") 

359 

360 def __init__( 

361 self, 

362 registry: _RegistryType, 

363 cls_: Type[_O], 

364 table: Optional[FromClause], 

365 mapper_kw: _MapperKwArgs, 

366 ): 

367 super().__init__(registry, cls_) 

368 

369 self.local_table = self.set_cls_attribute("__table__", table) 

370 

371 with mapperlib._CONFIGURE_MUTEX: 

372 clsregistry._add_class( 

373 self.classname, self.cls, registry._class_registry 

374 ) 

375 

376 self._setup_inheritance(mapper_kw) 

377 

378 self._early_mapping(mapper_kw) 

379 

380 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

381 mapper_cls = Mapper 

382 

383 return self.set_cls_attribute( 

384 "__mapper__", 

385 mapper_cls(self.cls, self.local_table, **mapper_kw), 

386 ) 

387 

388 def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None: 

389 cls = self.cls 

390 

391 inherits = None 

392 inherits_search = [] 

393 

394 # since we search for classical mappings now, search for 

395 # multiple mapped bases as well and raise an error. 

396 for base_ in cls.__bases__: 

397 c = _resolve_for_abstract_or_classical(base_) 

398 if c is None: 

399 continue 

400 

401 if _is_supercls_for_inherits(c) and c not in inherits_search: 

402 inherits_search.append(c) 

403 

404 if inherits_search: 

405 if len(inherits_search) > 1: 

406 raise exc.InvalidRequestError( 

407 "Class %s has multiple mapped bases: %r" 

408 % (cls, inherits_search) 

409 ) 

410 inherits = inherits_search[0] 

411 

412 self.inherits = inherits 

413 

414 

415class _CollectedAnnotation(NamedTuple): 

416 raw_annotation: _AnnotationScanType 

417 mapped_container: Optional[Type[Mapped[Any]]] 

418 extracted_mapped_annotation: Union[_AnnotationScanType, str] 

419 is_dataclass: bool 

420 attr_value: Any 

421 originating_module: str 

422 originating_class: Type[Any] 

423 

424 

425class _ClassScanAbstractConfig(_ORMClassConfigurator): 

426 """Abstract base for a configurator that configures a class for a 

427 declarative mapping, or an unmapped ORM dataclass. 

428 

429 Defines scanning of pep-484 annotations as well as ORM dataclass 

430 applicators 

431 

432 """ 

433 

434 __slots__ = () 

435 

436 clsdict_view: _ClassDict 

437 collected_annotations: Dict[str, _CollectedAnnotation] 

438 collected_attributes: Dict[str, Any] 

439 

440 is_dataclass_prior_to_mapping: bool 

441 allow_unmapped_annotations: bool 

442 

443 dataclass_setup_arguments: Optional[_DataclassArguments] 

444 """if the class has SQLAlchemy native dataclass parameters, where 

445 we will turn the class into a dataclass within the declarative mapping 

446 process. 

447 

448 """ 

449 

450 allow_dataclass_fields: bool 

451 """if true, look for dataclass-processed Field objects on the target 

452 class as well as superclasses and extract ORM mapping directives from 

453 the "metadata" attribute of each Field. 

454 

455 if False, dataclass fields can still be used, however they won't be 

456 mapped. 

457 

458 """ 

459 

460 _include_dunders = { 

461 "__table__", 

462 "__mapper_args__", 

463 "__tablename__", 

464 "__table_args__", 

465 } 

466 

467 _match_exclude_dunders = re.compile(r"^(?:_sa_|__)") 

468 

469 def _scan_attributes(self) -> None: 

470 raise NotImplementedError() 

471 

472 def _setup_dataclasses_transforms( 

473 self, *, enable_descriptor_defaults: bool, revert: bool = False 

474 ) -> None: 

475 dataclass_setup_arguments = self.dataclass_setup_arguments 

476 if not dataclass_setup_arguments: 

477 return 

478 

479 # can't use is_dataclass since it uses hasattr 

480 if "__dataclass_fields__" in self.cls.__dict__: 

481 raise exc.InvalidRequestError( 

482 f"Class {self.cls} is already a dataclass; ensure that " 

483 "base classes / decorator styles of establishing dataclasses " 

484 "are not being mixed. " 

485 "This can happen if a class that inherits from " 

486 "'MappedAsDataclass', even indirectly, is been mapped with " 

487 "'@registry.mapped_as_dataclass'" 

488 ) 

489 

490 # can't create a dataclass if __table__ is already there. This would 

491 # fail an assertion when calling _get_arguments_for_make_dataclass: 

492 # assert False, "Mapped[] received without a mapping declaration" 

493 if "__table__" in self.cls.__dict__: 

494 raise exc.InvalidRequestError( 

495 f"Class {self.cls} already defines a '__table__'. " 

496 "ORM Annotated Dataclasses do not support a pre-existing " 

497 "'__table__' element" 

498 ) 

499 

500 raise_for_non_dc_attrs = collections.defaultdict(list) 

501 

502 def _allow_dataclass_field( 

503 key: str, originating_class: Type[Any] 

504 ) -> bool: 

505 if ( 

506 originating_class is not self.cls 

507 and "__dataclass_fields__" not in originating_class.__dict__ 

508 ): 

509 raise_for_non_dc_attrs[originating_class].append(key) 

510 

511 return True 

512 

513 field_list = [ 

514 _AttributeOptions._get_arguments_for_make_dataclass( 

515 self, 

516 key, 

517 anno, 

518 mapped_container, 

519 self.collected_attributes.get(key, _NoArg.NO_ARG), 

520 dataclass_setup_arguments, 

521 enable_descriptor_defaults, 

522 ) 

523 for key, anno, mapped_container in ( 

524 ( 

525 key, 

526 raw_anno, 

527 mapped_container, 

528 ) 

529 for key, ( 

530 raw_anno, 

531 mapped_container, 

532 mapped_anno, 

533 is_dc, 

534 attr_value, 

535 originating_module, 

536 originating_class, 

537 ) in self.collected_annotations.items() 

538 if _allow_dataclass_field(key, originating_class) 

539 and ( 

540 key not in self.collected_attributes 

541 # issue #9226; check for attributes that we've collected 

542 # which are already instrumented, which we would assume 

543 # mean we are in an ORM inheritance mapping and this 

544 # attribute is already mapped on the superclass. Under 

545 # no circumstance should any QueryableAttribute be sent to 

546 # the dataclass() function; anything that's mapped should 

547 # be Field and that's it 

548 or not isinstance( 

549 self.collected_attributes[key], QueryableAttribute 

550 ) 

551 ) 

552 ) 

553 ] 

554 if raise_for_non_dc_attrs: 

555 for ( 

556 originating_class, 

557 non_dc_attrs, 

558 ) in raise_for_non_dc_attrs.items(): 

559 raise exc.InvalidRequestError( 

560 f"When transforming {self.cls} to a dataclass, " 

561 f"attribute(s) " 

562 f"{', '.join(repr(key) for key in non_dc_attrs)} " 

563 f"originates from superclass " 

564 f"{originating_class}, which is not a dataclass. When " 

565 f"declaring SQLAlchemy Declarative " 

566 f"Dataclasses, ensure that all mixin classes and other " 

567 f"superclasses which include attributes are also a " 

568 f"subclass of MappedAsDataclass or make use of the " 

569 f"@unmapped_dataclass decorator.", 

570 code="dcmx", 

571 ) 

572 

573 if revert: 

574 # the "revert" case is used only by an unmapped mixin class 

575 # that is nonetheless using Mapped construct and needs to 

576 # itself be a dataclass 

577 revert_dict = { 

578 name: self.cls.__dict__[name] 

579 for name in (item[0] for item in field_list) 

580 if name in self.cls.__dict__ 

581 } 

582 else: 

583 revert_dict = None 

584 

585 # get original annotations using ForwardRef for symbols that 

586 # are unresolvable 

587 orig_annotations = util.get_annotations(self.cls) 

588 

589 # build a new __annotations__ dict from the fields we have. 

590 # this has to be done carefully since we have to maintain 

591 # the correct order! wow 

592 swap_annotations = {} 

593 defaults = {} 

594 

595 for item in field_list: 

596 if len(item) == 2: 

597 name, tp = item 

598 elif len(item) == 3: 

599 name, tp, spec = item 

600 defaults[name] = spec 

601 else: 

602 assert False 

603 

604 # add the annotation to the new dict we are creating. 

605 # note that if name is in orig_annotations, we expect 

606 # tp and orig_annotations[name] to be identical. 

607 swap_annotations[name] = orig_annotations.get(name, tp) 

608 

609 for k, v in defaults.items(): 

610 setattr(self.cls, k, v) 

611 

612 self._assert_dc_arguments(dataclass_setup_arguments) 

613 

614 dataclass_callable = dataclass_setup_arguments["dataclass_callable"] 

615 if dataclass_callable is _NoArg.NO_ARG: 

616 dataclass_callable = dataclasses.dataclass 

617 

618 # create a merged __annotations__ dictionary, maintaining order 

619 # as best we can: 

620 

621 # 1. merge all keys in orig_annotations that occur before 

622 # we see any of our mapped fields (this can be attributes like 

623 # __table_args__ etc.) 

624 new_annotations = { 

625 k: orig_annotations[k] 

626 for k in itertools.takewhile( 

627 lambda k: k not in swap_annotations, orig_annotations 

628 ) 

629 } 

630 

631 # 2. then put in all the dataclass annotations we have 

632 new_annotations |= swap_annotations 

633 

634 # 3. them merge all of orig_annotations which will add remaining 

635 # keys 

636 new_annotations |= orig_annotations 

637 

638 # 4. this becomes the new class annotations. 

639 restore_anno = util.restore_annotations(self.cls, new_annotations) 

640 

641 try: 

642 dataclass_callable( # type: ignore[call-overload] 

643 self.cls, 

644 **{ # type: ignore[call-overload,unused-ignore] 

645 k: v 

646 for k, v in dataclass_setup_arguments.items() 

647 if v is not _NoArg.NO_ARG 

648 and k not in ("dataclass_callable",) 

649 }, 

650 ) 

651 except (TypeError, ValueError) as ex: 

652 raise exc.InvalidRequestError( 

653 f"Python dataclasses error encountered when creating " 

654 f"dataclass for {self.cls.__name__!r}: " 

655 f"{ex!r}. Please refer to Python dataclasses " 

656 "documentation for additional information.", 

657 code="dcte", 

658 ) from ex 

659 finally: 

660 if revert and revert_dict: 

661 # used for mixin dataclasses; we have to restore the 

662 # mapped_column(), relationship() etc. to the class so these 

663 # take place for a mapped class scan 

664 for k, v in revert_dict.items(): 

665 setattr(self.cls, k, v) 

666 

667 restore_anno() 

668 

669 def _collect_annotation( 

670 self, 

671 name: str, 

672 raw_annotation: _AnnotationScanType, 

673 originating_class: Type[Any], 

674 expect_mapped: Optional[bool], 

675 attr_value: Any, 

676 ) -> Optional[_CollectedAnnotation]: 

677 if name in self.collected_annotations: 

678 return self.collected_annotations[name] 

679 

680 if raw_annotation is None: 

681 return None 

682 

683 is_dataclass = self.is_dataclass_prior_to_mapping 

684 allow_unmapped = self.allow_unmapped_annotations 

685 

686 if expect_mapped is None: 

687 is_dataclass_field = isinstance(attr_value, dataclasses.Field) 

688 expect_mapped = ( 

689 not is_dataclass_field 

690 and not allow_unmapped 

691 and ( 

692 attr_value is None 

693 or isinstance(attr_value, _MappedAttribute) 

694 ) 

695 ) 

696 

697 is_dataclass_field = False 

698 extracted = _extract_mapped_subtype( 

699 raw_annotation, 

700 self.cls, 

701 originating_class.__module__, 

702 name, 

703 type(attr_value), 

704 required=False, 

705 is_dataclass_field=is_dataclass_field, 

706 expect_mapped=expect_mapped and not is_dataclass, 

707 ) 

708 if extracted is None: 

709 # ClassVar can come out here 

710 return None 

711 

712 extracted_mapped_annotation, mapped_container = extracted 

713 

714 if attr_value is None and not is_literal(extracted_mapped_annotation): 

715 for elem in get_args(extracted_mapped_annotation): 

716 if is_fwd_ref( 

717 elem, check_generic=True, check_for_plain_string=True 

718 ): 

719 elem = de_stringify_annotation( 

720 self.cls, 

721 elem, 

722 originating_class.__module__, 

723 include_generic=True, 

724 ) 

725 # look in Annotated[...] for an ORM construct, 

726 # such as Annotated[int, mapped_column(primary_key=True)] 

727 if isinstance(elem, _IntrospectsAnnotations): 

728 attr_value = elem.found_in_pep593_annotated() 

729 

730 self.collected_annotations[name] = ca = _CollectedAnnotation( 

731 raw_annotation, 

732 mapped_container, 

733 extracted_mapped_annotation, 

734 is_dataclass, 

735 attr_value, 

736 originating_class.__module__, 

737 originating_class, 

738 ) 

739 return ca 

740 

741 @classmethod 

742 def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None: 

743 allowed = { 

744 "init", 

745 "repr", 

746 "order", 

747 "eq", 

748 "unsafe_hash", 

749 "kw_only", 

750 "match_args", 

751 "dataclass_callable", 

752 } 

753 disallowed_args = set(arguments).difference(allowed) 

754 if disallowed_args: 

755 msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args)) 

756 raise exc.ArgumentError( 

757 f"Dataclass argument(s) {msg} are not accepted" 

758 ) 

759 

760 def _cls_attr_override_checker( 

761 self, cls: Type[_O] 

762 ) -> Callable[[str, Any], bool]: 

763 """Produce a function that checks if a class has overridden an 

764 attribute, taking SQLAlchemy-enabled dataclass fields into account. 

765 

766 """ 

767 

768 if self.allow_dataclass_fields: 

769 sa_dataclass_metadata_key = _get_immediate_cls_attr( 

770 cls, "__sa_dataclass_metadata_key__" 

771 ) 

772 else: 

773 sa_dataclass_metadata_key = None 

774 

775 if not sa_dataclass_metadata_key: 

776 

777 def attribute_is_overridden(key: str, obj: Any) -> bool: 

778 return getattr(cls, key, obj) is not obj 

779 

780 else: 

781 all_datacls_fields = { 

782 f.name: f.metadata[sa_dataclass_metadata_key] 

783 for f in util.dataclass_fields(cls) 

784 if sa_dataclass_metadata_key in f.metadata 

785 } 

786 local_datacls_fields = { 

787 f.name: f.metadata[sa_dataclass_metadata_key] 

788 for f in util.local_dataclass_fields(cls) 

789 if sa_dataclass_metadata_key in f.metadata 

790 } 

791 

792 absent = object() 

793 

794 def attribute_is_overridden(key: str, obj: Any) -> bool: 

795 if _is_declarative_props(obj): 

796 obj = obj.fget 

797 

798 # this function likely has some failure modes still if 

799 # someone is doing a deep mixing of the same attribute 

800 # name as plain Python attribute vs. dataclass field. 

801 

802 ret = local_datacls_fields.get(key, absent) 

803 if _is_declarative_props(ret): 

804 ret = ret.fget 

805 

806 if ret is obj: 

807 return False 

808 elif ret is not absent: 

809 return True 

810 

811 all_field = all_datacls_fields.get(key, absent) 

812 

813 ret = getattr(cls, key, obj) 

814 

815 if ret is obj: 

816 return False 

817 

818 # for dataclasses, this could be the 

819 # 'default' of the field. so filter more specifically 

820 # for an already-mapped InstrumentedAttribute 

821 if ret is not absent and isinstance( 

822 ret, InstrumentedAttribute 

823 ): 

824 return True 

825 

826 if all_field is obj: 

827 return False 

828 elif all_field is not absent: 

829 return True 

830 

831 # can't find another attribute 

832 return False 

833 

834 return attribute_is_overridden 

835 

836 def _cls_attr_resolver( 

837 self, cls: Type[Any] 

838 ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]: 

839 """produce a function to iterate the "attributes" of a class 

840 which we want to consider for mapping, adjusting for SQLAlchemy fields 

841 embedded in dataclass fields. 

842 

843 """ 

844 cls_annotations = util.get_annotations(cls) 

845 

846 cls_vars = vars(cls) 

847 

848 _include_dunders = self._include_dunders 

849 _match_exclude_dunders = self._match_exclude_dunders 

850 

851 names = [ 

852 n 

853 for n in util.merge_lists_w_ordering( 

854 list(cls_vars), list(cls_annotations) 

855 ) 

856 if not _match_exclude_dunders.match(n) or n in _include_dunders 

857 ] 

858 

859 if self.allow_dataclass_fields: 

860 sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr( 

861 cls, "__sa_dataclass_metadata_key__" 

862 ) 

863 else: 

864 sa_dataclass_metadata_key = None 

865 

866 if not sa_dataclass_metadata_key: 

867 

868 def local_attributes_for_class() -> ( 

869 Iterable[Tuple[str, Any, Any, bool]] 

870 ): 

871 return ( 

872 ( 

873 name, 

874 cls_vars.get(name), 

875 cls_annotations.get(name), 

876 False, 

877 ) 

878 for name in names 

879 ) 

880 

881 else: 

882 dataclass_fields = { 

883 field.name: field for field in util.local_dataclass_fields(cls) 

884 } 

885 

886 fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key 

887 

888 def local_attributes_for_class() -> ( 

889 Iterable[Tuple[str, Any, Any, bool]] 

890 ): 

891 for name in names: 

892 field = dataclass_fields.get(name, None) 

893 if field and sa_dataclass_metadata_key in field.metadata: 

894 yield field.name, _as_dc_declaredattr( 

895 field.metadata, fixed_sa_dataclass_metadata_key 

896 ), cls_annotations.get(field.name), True 

897 else: 

898 yield name, cls_vars.get(name), cls_annotations.get( 

899 name 

900 ), False 

901 

902 return local_attributes_for_class 

903 

904 

905class _DeclarativeMapperConfig(_MapperConfig, _ClassScanAbstractConfig): 

906 """Configurator that will produce a declarative mapped class""" 

907 

908 __slots__ = ( 

909 "registry", 

910 "local_table", 

911 "persist_selectable", 

912 "declared_columns", 

913 "column_ordering", 

914 "column_copies", 

915 "table_args", 

916 "tablename", 

917 "mapper_args", 

918 "mapper_args_fn", 

919 "table_fn", 

920 "inherits", 

921 "single", 

922 "clsdict_view", 

923 "collected_attributes", 

924 "collected_annotations", 

925 "allow_dataclass_fields", 

926 "dataclass_setup_arguments", 

927 "is_dataclass_prior_to_mapping", 

928 "allow_unmapped_annotations", 

929 ) 

930 

931 is_deferred = False 

932 registry: _RegistryType 

933 local_table: Optional[FromClause] 

934 persist_selectable: Optional[FromClause] 

935 declared_columns: util.OrderedSet[Column[Any]] 

936 column_ordering: Dict[Column[Any], int] 

937 column_copies: Dict[ 

938 Union[MappedColumn[Any], Column[Any]], 

939 Union[MappedColumn[Any], Column[Any]], 

940 ] 

941 tablename: Optional[str] 

942 mapper_args: Mapping[str, Any] 

943 table_args: Optional[_TableArgsType] 

944 mapper_args_fn: Optional[Callable[[], Dict[str, Any]]] 

945 inherits: Optional[Type[Any]] 

946 single: bool 

947 

948 def __init__( 

949 self, 

950 registry: _RegistryType, 

951 cls_: Type[_O], 

952 dict_: _ClassDict, 

953 ): 

954 # grab class dict before the instrumentation manager has been added. 

955 # reduces cycles 

956 self.clsdict_view = ( 

957 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT 

958 ) 

959 super().__init__(registry, cls_) 

960 self.registry = registry 

961 self.persist_selectable = None 

962 

963 self.collected_attributes = {} 

964 self.collected_annotations = {} 

965 self.declared_columns = util.OrderedSet() 

966 self.column_ordering = {} 

967 self.column_copies = {} 

968 self.single = False 

969 self.dataclass_setup_arguments = dca = getattr( 

970 self.cls, "_sa_apply_dc_transforms", None 

971 ) 

972 

973 self.allow_unmapped_annotations = getattr( 

974 self.cls, "__allow_unmapped__", False 

975 ) or bool(self.dataclass_setup_arguments) 

976 

977 self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass( 

978 cls_ 

979 ) 

980 

981 sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__") 

982 

983 # we don't want to consume Field objects from a not-already-dataclass. 

984 # the Field objects won't have their "name" or "type" populated, 

985 # and while it seems like we could just set these on Field as we 

986 # read them, Field is documented as "user read only" and we need to 

987 # stay far away from any off-label use of dataclasses APIs. 

988 if (not cld or dca) and sdk: 

989 raise exc.InvalidRequestError( 

990 "SQLAlchemy mapped dataclasses can't consume mapping " 

991 "information from dataclass.Field() objects if the immediate " 

992 "class is not already a dataclass." 

993 ) 

994 

995 # if already a dataclass, and __sa_dataclass_metadata_key__ present, 

996 # then also look inside of dataclass.Field() objects yielded by 

997 # dataclasses.get_fields(cls) when scanning for attributes 

998 self.allow_dataclass_fields = bool(sdk and cld) 

999 

1000 self._setup_declared_events() 

1001 

1002 self._scan_attributes() 

1003 

1004 self._setup_dataclasses_transforms(enable_descriptor_defaults=True) 

1005 

1006 with mapperlib._CONFIGURE_MUTEX: 

1007 clsregistry._add_class( 

1008 self.classname, self.cls, registry._class_registry 

1009 ) 

1010 

1011 self._setup_inheriting_mapper() 

1012 

1013 self._extract_mappable_attributes() 

1014 

1015 self._extract_declared_columns() 

1016 

1017 self._setup_table() 

1018 

1019 self._setup_inheriting_columns() 

1020 

1021 self._early_mapping(util.EMPTY_DICT) 

1022 

1023 def _setup_declared_events(self) -> None: 

1024 if _get_immediate_cls_attr(self.cls, "__declare_last__"): 

1025 

1026 @event.listens_for(Mapper, "after_configured") 

1027 def after_configured() -> None: 

1028 cast( 

1029 "_DeclMappedClassProtocol[Any]", self.cls 

1030 ).__declare_last__() 

1031 

1032 if _get_immediate_cls_attr(self.cls, "__declare_first__"): 

1033 

1034 @event.listens_for(Mapper, "before_configured") 

1035 def before_configured() -> None: 

1036 cast( 

1037 "_DeclMappedClassProtocol[Any]", self.cls 

1038 ).__declare_first__() 

1039 

1040 def _scan_attributes(self) -> None: 

1041 cls = self.cls 

1042 

1043 cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls) 

1044 

1045 clsdict_view = self.clsdict_view 

1046 collected_attributes = self.collected_attributes 

1047 column_copies = self.column_copies 

1048 _include_dunders = self._include_dunders 

1049 mapper_args_fn = None 

1050 table_args = inherited_table_args = None 

1051 table_fn = None 

1052 tablename = None 

1053 fixed_table = "__table__" in clsdict_view 

1054 

1055 attribute_is_overridden = self._cls_attr_override_checker(self.cls) 

1056 

1057 bases = [] 

1058 

1059 for base in cls.__mro__: 

1060 # collect bases and make sure standalone columns are copied 

1061 # to be the column they will ultimately be on the class, 

1062 # so that declared_attr functions use the right columns. 

1063 # need to do this all the way up the hierarchy first 

1064 # (see #8190) 

1065 

1066 class_mapped = base is not cls and _is_supercls_for_inherits(base) 

1067 

1068 local_attributes_for_class = self._cls_attr_resolver(base) 

1069 

1070 if not class_mapped and base is not cls: 

1071 locally_collected_columns = self._produce_column_copies( 

1072 local_attributes_for_class, 

1073 attribute_is_overridden, 

1074 fixed_table, 

1075 base, 

1076 ) 

1077 else: 

1078 locally_collected_columns = {} 

1079 

1080 bases.append( 

1081 ( 

1082 base, 

1083 class_mapped, 

1084 local_attributes_for_class, 

1085 locally_collected_columns, 

1086 ) 

1087 ) 

1088 

1089 for ( 

1090 base, 

1091 class_mapped, 

1092 local_attributes_for_class, 

1093 locally_collected_columns, 

1094 ) in bases: 

1095 # this transfer can also take place as we scan each name 

1096 # for finer-grained control of how collected_attributes is 

1097 # populated, as this is what impacts column ordering. 

1098 # however it's simpler to get it out of the way here. 

1099 collected_attributes.update(locally_collected_columns) 

1100 

1101 for ( 

1102 name, 

1103 obj, 

1104 annotation, 

1105 is_dataclass_field, 

1106 ) in local_attributes_for_class(): 

1107 if name in _include_dunders: 

1108 if name == "__mapper_args__": 

1109 check_decl = _check_declared_props_nocascade( 

1110 obj, name, cls 

1111 ) 

1112 if not mapper_args_fn and ( 

1113 not class_mapped or check_decl 

1114 ): 

1115 # don't even invoke __mapper_args__ until 

1116 # after we've determined everything about the 

1117 # mapped table. 

1118 # make a copy of it so a class-level dictionary 

1119 # is not overwritten when we update column-based 

1120 # arguments. 

1121 def _mapper_args_fn() -> Dict[str, Any]: 

1122 return dict(cls_as_Decl.__mapper_args__) 

1123 

1124 mapper_args_fn = _mapper_args_fn 

1125 

1126 elif name == "__tablename__": 

1127 check_decl = _check_declared_props_nocascade( 

1128 obj, name, cls 

1129 ) 

1130 if not tablename and (not class_mapped or check_decl): 

1131 tablename = cls_as_Decl.__tablename__ 

1132 elif name == "__table__": 

1133 check_decl = _check_declared_props_nocascade( 

1134 obj, name, cls 

1135 ) 

1136 # if a @declared_attr using "__table__" is detected, 

1137 # wrap up a callable to look for "__table__" from 

1138 # the final concrete class when we set up a table. 

1139 # this was fixed by 

1140 # #11509, regression in 2.0 from version 1.4. 

1141 if check_decl and not table_fn: 

1142 # don't even invoke __table__ until we're ready 

1143 def _table_fn() -> FromClause: 

1144 return cls_as_Decl.__table__ 

1145 

1146 table_fn = _table_fn 

1147 

1148 elif name == "__table_args__": 

1149 check_decl = _check_declared_props_nocascade( 

1150 obj, name, cls 

1151 ) 

1152 if not table_args and (not class_mapped or check_decl): 

1153 table_args = cls_as_Decl.__table_args__ 

1154 if not isinstance( 

1155 table_args, (tuple, dict, type(None)) 

1156 ): 

1157 raise exc.ArgumentError( 

1158 "__table_args__ value must be a tuple, " 

1159 "dict, or None" 

1160 ) 

1161 if base is not cls: 

1162 inherited_table_args = True 

1163 else: 

1164 # any other dunder names; should not be here 

1165 # as we have tested for all four names in 

1166 # _include_dunders 

1167 assert False 

1168 elif class_mapped: 

1169 if _is_declarative_props(obj) and not obj._quiet: 

1170 util.warn( 

1171 "Regular (i.e. not __special__) " 

1172 "attribute '%s.%s' uses @declared_attr, " 

1173 "but owning class %s is mapped - " 

1174 "not applying to subclass %s." 

1175 % (base.__name__, name, base, cls) 

1176 ) 

1177 

1178 continue 

1179 elif base is not cls: 

1180 # we're a mixin, abstract base, or something that is 

1181 # acting like that for now. 

1182 

1183 if isinstance(obj, (Column, MappedColumn)): 

1184 # already copied columns to the mapped class. 

1185 continue 

1186 elif isinstance(obj, MapperProperty): 

1187 raise exc.InvalidRequestError( 

1188 "Mapper properties (i.e. deferred," 

1189 "column_property(), relationship(), etc.) must " 

1190 "be declared as @declared_attr callables " 

1191 "on declarative mixin classes. For dataclass " 

1192 "field() objects, use a lambda:" 

1193 ) 

1194 elif _is_declarative_props(obj): 

1195 # tried to get overloads to tell this to 

1196 # pylance, no luck 

1197 assert obj is not None 

1198 

1199 if obj._cascading: 

1200 if name in clsdict_view: 

1201 # unfortunately, while we can use the user- 

1202 # defined attribute here to allow a clean 

1203 # override, if there's another 

1204 # subclass below then it still tries to use 

1205 # this. not sure if there is enough 

1206 # information here to add this as a feature 

1207 # later on. 

1208 util.warn( 

1209 "Attribute '%s' on class %s cannot be " 

1210 "processed due to " 

1211 "@declared_attr.cascading; " 

1212 "skipping" % (name, cls) 

1213 ) 

1214 collected_attributes[name] = column_copies[obj] = ( 

1215 ret 

1216 ) = obj.__get__(obj, cls) 

1217 setattr(cls, name, ret) 

1218 else: 

1219 if is_dataclass_field: 

1220 # access attribute using normal class access 

1221 # first, to see if it's been mapped on a 

1222 # superclass. note if the dataclasses.field() 

1223 # has "default", this value can be anything. 

1224 ret = getattr(cls, name, None) 

1225 

1226 # so, if it's anything that's not ORM 

1227 # mapped, assume we should invoke the 

1228 # declared_attr 

1229 if not isinstance(ret, InspectionAttr): 

1230 ret = obj.fget() 

1231 else: 

1232 # access attribute using normal class access. 

1233 # if the declared attr already took place 

1234 # on a superclass that is mapped, then 

1235 # this is no longer a declared_attr, it will 

1236 # be the InstrumentedAttribute 

1237 ret = getattr(cls, name) 

1238 

1239 # correct for proxies created from hybrid_property 

1240 # or similar. note there is no known case that 

1241 # produces nested proxies, so we are only 

1242 # looking one level deep right now. 

1243 

1244 if ( 

1245 isinstance(ret, InspectionAttr) 

1246 and attr_is_internal_proxy(ret) 

1247 and not isinstance( 

1248 ret.original_property, MapperProperty 

1249 ) 

1250 ): 

1251 ret = ret.descriptor 

1252 

1253 collected_attributes[name] = column_copies[obj] = ( 

1254 ret 

1255 ) 

1256 

1257 if ( 

1258 isinstance(ret, (Column, MapperProperty)) 

1259 and ret.doc is None 

1260 ): 

1261 ret.doc = obj.__doc__ 

1262 

1263 self._collect_annotation( 

1264 name, 

1265 obj._collect_return_annotation(), 

1266 base, 

1267 True, 

1268 obj, 

1269 ) 

1270 elif _is_mapped_annotation(annotation, cls, base): 

1271 # Mapped annotation without any object. 

1272 # product_column_copies should have handled this. 

1273 # if future support for other MapperProperty, 

1274 # then test if this name is already handled and 

1275 # otherwise proceed to generate. 

1276 if not fixed_table: 

1277 assert ( 

1278 name in collected_attributes 

1279 or attribute_is_overridden(name, None) 

1280 ) 

1281 continue 

1282 else: 

1283 # here, the attribute is some other kind of 

1284 # property that we assume is not part of the 

1285 # declarative mapping. however, check for some 

1286 # more common mistakes 

1287 self._warn_for_decl_attributes(base, name, obj) 

1288 elif is_dataclass_field and ( 

1289 name not in clsdict_view or clsdict_view[name] is not obj 

1290 ): 

1291 # here, we are definitely looking at the target class 

1292 # and not a superclass. this is currently a 

1293 # dataclass-only path. if the name is only 

1294 # a dataclass field and isn't in local cls.__dict__, 

1295 # put the object there. 

1296 # assert that the dataclass-enabled resolver agrees 

1297 # with what we are seeing 

1298 

1299 assert not attribute_is_overridden(name, obj) 

1300 

1301 if _is_declarative_props(obj): 

1302 obj = obj.fget() 

1303 

1304 collected_attributes[name] = obj 

1305 self._collect_annotation( 

1306 name, annotation, base, False, obj 

1307 ) 

1308 else: 

1309 collected_annotation = self._collect_annotation( 

1310 name, annotation, base, None, obj 

1311 ) 

1312 is_mapped = ( 

1313 collected_annotation is not None 

1314 and collected_annotation.mapped_container is not None 

1315 ) 

1316 generated_obj = ( 

1317 collected_annotation.attr_value 

1318 if collected_annotation is not None 

1319 else obj 

1320 ) 

1321 if obj is None and not fixed_table and is_mapped: 

1322 collected_attributes[name] = ( 

1323 generated_obj 

1324 if generated_obj is not None 

1325 else MappedColumn() 

1326 ) 

1327 elif name in clsdict_view: 

1328 collected_attributes[name] = obj 

1329 # else if the name is not in the cls.__dict__, 

1330 # don't collect it as an attribute. 

1331 # we will see the annotation only, which is meaningful 

1332 # both for mapping and dataclasses setup 

1333 

1334 if inherited_table_args and not tablename: 

1335 table_args = None 

1336 

1337 self.table_args = table_args 

1338 self.tablename = tablename 

1339 self.mapper_args_fn = mapper_args_fn 

1340 self.table_fn = table_fn 

1341 

1342 @classmethod 

1343 def _update_annotations_for_non_mapped_class( 

1344 cls, klass: Type[_O] 

1345 ) -> Mapping[str, _AnnotationScanType]: 

1346 cls_annotations = util.get_annotations(klass) 

1347 

1348 new_anno = {} 

1349 for name, annotation in cls_annotations.items(): 

1350 if _is_mapped_annotation(annotation, klass, klass): 

1351 extracted = _extract_mapped_subtype( 

1352 annotation, 

1353 klass, 

1354 klass.__module__, 

1355 name, 

1356 type(None), 

1357 required=False, 

1358 is_dataclass_field=False, 

1359 expect_mapped=False, 

1360 ) 

1361 if extracted: 

1362 inner, _ = extracted 

1363 new_anno[name] = inner 

1364 else: 

1365 new_anno[name] = annotation 

1366 return new_anno 

1367 

1368 def _warn_for_decl_attributes( 

1369 self, cls: Type[Any], key: str, c: Any 

1370 ) -> None: 

1371 if isinstance(c, expression.ColumnElement): 

1372 util.warn( 

1373 f"Attribute '{key}' on class {cls} appears to " 

1374 "be a non-schema SQLAlchemy expression " 

1375 "object; this won't be part of the declarative mapping. " 

1376 "To map arbitrary expressions, use ``column_property()`` " 

1377 "or a similar function such as ``deferred()``, " 

1378 "``query_expression()`` etc. " 

1379 ) 

1380 

1381 def _produce_column_copies( 

1382 self, 

1383 attributes_for_class: Callable[ 

1384 [], Iterable[Tuple[str, Any, Any, bool]] 

1385 ], 

1386 attribute_is_overridden: Callable[[str, Any], bool], 

1387 fixed_table: bool, 

1388 originating_class: Type[Any], 

1389 ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]: 

1390 cls = self.cls 

1391 dict_ = self.clsdict_view 

1392 locally_collected_attributes = {} 

1393 column_copies = self.column_copies 

1394 # copy mixin columns to the mapped class 

1395 

1396 for name, obj, annotation, is_dataclass in attributes_for_class(): 

1397 if ( 

1398 not fixed_table 

1399 and obj is None 

1400 and _is_mapped_annotation(annotation, cls, originating_class) 

1401 ): 

1402 # obj is None means this is the annotation only path 

1403 

1404 if attribute_is_overridden(name, obj): 

1405 # perform same "overridden" check as we do for 

1406 # Column/MappedColumn, this is how a mixin col is not 

1407 # applied to an inherited subclass that does not have 

1408 # the mixin. the anno-only path added here for 

1409 # #9564 

1410 continue 

1411 

1412 collected_annotation = self._collect_annotation( 

1413 name, annotation, originating_class, True, obj 

1414 ) 

1415 obj = ( 

1416 collected_annotation.attr_value 

1417 if collected_annotation is not None 

1418 else obj 

1419 ) 

1420 if obj is None: 

1421 obj = MappedColumn() 

1422 

1423 locally_collected_attributes[name] = obj 

1424 setattr(cls, name, obj) 

1425 

1426 elif isinstance(obj, (Column, MappedColumn)): 

1427 if attribute_is_overridden(name, obj): 

1428 # if column has been overridden 

1429 # (like by the InstrumentedAttribute of the 

1430 # superclass), skip. don't collect the annotation 

1431 # either (issue #8718) 

1432 continue 

1433 

1434 collected_annotation = self._collect_annotation( 

1435 name, annotation, originating_class, True, obj 

1436 ) 

1437 obj = ( 

1438 collected_annotation.attr_value 

1439 if collected_annotation is not None 

1440 else obj 

1441 ) 

1442 

1443 if name not in dict_ and not ( 

1444 "__table__" in dict_ 

1445 and (getattr(obj, "name", None) or name) 

1446 in dict_["__table__"].c 

1447 ): 

1448 if obj.foreign_keys: 

1449 for fk in obj.foreign_keys: 

1450 if ( 

1451 fk._table_column is not None 

1452 and fk._table_column.table is None 

1453 ): 

1454 raise exc.InvalidRequestError( 

1455 "Columns with foreign keys to " 

1456 "non-table-bound " 

1457 "columns must be declared as " 

1458 "@declared_attr callables " 

1459 "on declarative mixin classes. " 

1460 "For dataclass " 

1461 "field() objects, use a lambda:." 

1462 ) 

1463 

1464 column_copies[obj] = copy_ = obj._copy() 

1465 

1466 locally_collected_attributes[name] = copy_ 

1467 setattr(cls, name, copy_) 

1468 

1469 return locally_collected_attributes 

1470 

1471 def _extract_mappable_attributes(self) -> None: 

1472 cls = self.cls 

1473 collected_attributes = self.collected_attributes 

1474 

1475 our_stuff = self.properties 

1476 

1477 _include_dunders = self._include_dunders 

1478 

1479 late_mapped = _get_immediate_cls_attr( 

1480 cls, "_sa_decl_prepare_nocascade", strict=True 

1481 ) 

1482 

1483 allow_unmapped_annotations = self.allow_unmapped_annotations 

1484 expect_annotations_wo_mapped = ( 

1485 allow_unmapped_annotations or self.is_dataclass_prior_to_mapping 

1486 ) 

1487 

1488 look_for_dataclass_things = bool(self.dataclass_setup_arguments) 

1489 

1490 for k in list(collected_attributes): 

1491 if k in _include_dunders: 

1492 continue 

1493 

1494 value = collected_attributes[k] 

1495 

1496 if _is_declarative_props(value): 

1497 # @declared_attr in collected_attributes only occurs here for a 

1498 # @declared_attr that's directly on the mapped class; 

1499 # for a mixin, these have already been evaluated 

1500 if value._cascading: 

1501 util.warn( 

1502 "Use of @declared_attr.cascading only applies to " 

1503 "Declarative 'mixin' and 'abstract' classes. " 

1504 "Currently, this flag is ignored on mapped class " 

1505 "%s" % self.cls 

1506 ) 

1507 

1508 value = getattr(cls, k) 

1509 

1510 elif ( 

1511 isinstance(value, QueryableAttribute) 

1512 and value.class_ is not cls 

1513 and value.key != k 

1514 ): 

1515 # detect a QueryableAttribute that's already mapped being 

1516 # assigned elsewhere in userland, turn into a synonym() 

1517 value = SynonymProperty(value.key) 

1518 setattr(cls, k, value) 

1519 

1520 if ( 

1521 isinstance(value, tuple) 

1522 and len(value) == 1 

1523 and isinstance(value[0], (Column, _MappedAttribute)) 

1524 ): 

1525 util.warn( 

1526 "Ignoring declarative-like tuple value of attribute " 

1527 "'%s': possibly a copy-and-paste error with a comma " 

1528 "accidentally placed at the end of the line?" % k 

1529 ) 

1530 continue 

1531 elif look_for_dataclass_things and isinstance( 

1532 value, dataclasses.Field 

1533 ): 

1534 # we collected a dataclass Field; dataclasses would have 

1535 # set up the correct state on the class 

1536 continue 

1537 elif not isinstance(value, (Column, _DCAttributeOptions)): 

1538 # using @declared_attr for some object that 

1539 # isn't Column/MapperProperty/_DCAttributeOptions; remove 

1540 # from the clsdict_view 

1541 # and place the evaluated value onto the class. 

1542 collected_attributes.pop(k) 

1543 self._warn_for_decl_attributes(cls, k, value) 

1544 if not late_mapped: 

1545 setattr(cls, k, value) 

1546 continue 

1547 # we expect to see the name 'metadata' in some valid cases; 

1548 # however at this point we see it's assigned to something trying 

1549 # to be mapped, so raise for that. 

1550 # TODO: should "registry" here be also? might be too late 

1551 # to change that now (2.0 betas) 

1552 elif k in ("metadata",): 

1553 raise exc.InvalidRequestError( 

1554 f"Attribute name '{k}' is reserved when using the " 

1555 "Declarative API." 

1556 ) 

1557 elif isinstance(value, Column): 

1558 _undefer_column_name( 

1559 k, self.column_copies.get(value, value) # type: ignore 

1560 ) 

1561 else: 

1562 if isinstance(value, _IntrospectsAnnotations): 

1563 ( 

1564 annotation, 

1565 mapped_container, 

1566 extracted_mapped_annotation, 

1567 is_dataclass, 

1568 attr_value, 

1569 originating_module, 

1570 originating_class, 

1571 ) = self.collected_annotations.get( 

1572 k, (None, None, None, False, None, None, None) 

1573 ) 

1574 

1575 # issue #8692 - don't do any annotation interpretation if 

1576 # an annotation were present and a container such as 

1577 # Mapped[] etc. were not used. If annotation is None, 

1578 # do declarative_scan so that the property can raise 

1579 # for required 

1580 if ( 

1581 mapped_container is not None 

1582 or annotation is None 

1583 # issue #10516: need to do declarative_scan even with 

1584 # a non-Mapped annotation if we are doing 

1585 # __allow_unmapped__, for things like col.name 

1586 # assignment 

1587 or allow_unmapped_annotations 

1588 ): 

1589 try: 

1590 value.declarative_scan( 

1591 self, 

1592 self.registry, 

1593 cls, 

1594 originating_module, 

1595 k, 

1596 mapped_container, 

1597 annotation, 

1598 extracted_mapped_annotation, 

1599 is_dataclass, 

1600 ) 

1601 except NameError as ne: 

1602 raise orm_exc.MappedAnnotationError( 

1603 f"Could not resolve all types within mapped " 

1604 f'annotation: "{annotation}". Ensure all ' 

1605 f"types are written correctly and are " 

1606 f"imported within the module in use." 

1607 ) from ne 

1608 else: 

1609 # assert that we were expecting annotations 

1610 # without Mapped[] were going to be passed. 

1611 # otherwise an error should have been raised 

1612 # by util._extract_mapped_subtype before we got here. 

1613 assert expect_annotations_wo_mapped 

1614 

1615 if isinstance(value, _DCAttributeOptions): 

1616 if ( 

1617 value._has_dataclass_arguments 

1618 and not look_for_dataclass_things 

1619 ): 

1620 if isinstance(value, MapperProperty): 

1621 argnames = [ 

1622 "init", 

1623 "default_factory", 

1624 "repr", 

1625 "default", 

1626 "dataclass_metadata", 

1627 ] 

1628 else: 

1629 argnames = [ 

1630 "init", 

1631 "default_factory", 

1632 "repr", 

1633 "dataclass_metadata", 

1634 ] 

1635 

1636 args = { 

1637 a 

1638 for a in argnames 

1639 if getattr( 

1640 value._attribute_options, f"dataclasses_{a}" 

1641 ) 

1642 is not _NoArg.NO_ARG 

1643 } 

1644 

1645 raise exc.ArgumentError( 

1646 f"Attribute '{k}' on class {cls} includes " 

1647 f"dataclasses argument(s): " 

1648 f"{', '.join(sorted(repr(a) for a in args))} but " 

1649 f"class does not specify " 

1650 "SQLAlchemy native dataclass configuration." 

1651 ) 

1652 

1653 if not isinstance(value, (MapperProperty, _MapsColumns)): 

1654 # filter for _DCAttributeOptions objects that aren't 

1655 # MapperProperty / mapped_column(). Currently this 

1656 # includes AssociationProxy. pop it from the things 

1657 # we're going to map and set it up as a descriptor 

1658 # on the class. 

1659 collected_attributes.pop(k) 

1660 

1661 # Assoc Prox (or other descriptor object that may 

1662 # use _DCAttributeOptions) is usually here, except if 

1663 # 1. we're a 

1664 # dataclass, dataclasses would have removed the 

1665 # attr here or 2. assoc proxy is coming from a 

1666 # superclass, we want it to be direct here so it 

1667 # tracks state or 3. assoc prox comes from 

1668 # declared_attr, uncommon case 

1669 setattr(cls, k, value) 

1670 continue 

1671 

1672 our_stuff[k] = value 

1673 

1674 def _extract_declared_columns(self) -> None: 

1675 our_stuff = self.properties 

1676 

1677 # extract columns from the class dict 

1678 declared_columns = self.declared_columns 

1679 column_ordering = self.column_ordering 

1680 name_to_prop_key = collections.defaultdict(set) 

1681 

1682 for key, c in list(our_stuff.items()): 

1683 if isinstance(c, _MapsColumns): 

1684 mp_to_assign = c.mapper_property_to_assign 

1685 if mp_to_assign: 

1686 our_stuff[key] = mp_to_assign 

1687 else: 

1688 # if no mapper property to assign, this currently means 

1689 # this is a MappedColumn that will produce a Column for us 

1690 del our_stuff[key] 

1691 

1692 for col, sort_order in c.columns_to_assign: 

1693 if not isinstance(c, CompositeProperty): 

1694 name_to_prop_key[col.name].add(key) 

1695 declared_columns.add(col) 

1696 

1697 # we would assert this, however we want the below 

1698 # warning to take effect instead. See #9630 

1699 # assert col not in column_ordering 

1700 

1701 column_ordering[col] = sort_order 

1702 

1703 # if this is a MappedColumn and the attribute key we 

1704 # have is not what the column has for its key, map the 

1705 # Column explicitly under the attribute key name. 

1706 # otherwise, Mapper will map it under the column key. 

1707 if mp_to_assign is None and key != col.key: 

1708 our_stuff[key] = col 

1709 elif isinstance(c, Column): 

1710 # undefer previously occurred here, and now occurs earlier. 

1711 # ensure every column we get here has been named 

1712 assert c.name is not None 

1713 name_to_prop_key[c.name].add(key) 

1714 declared_columns.add(c) 

1715 # if the column is the same name as the key, 

1716 # remove it from the explicit properties dict. 

1717 # the normal rules for assigning column-based properties 

1718 # will take over, including precedence of columns 

1719 # in multi-column ColumnProperties. 

1720 if key == c.key: 

1721 del our_stuff[key] 

1722 

1723 for name, keys in name_to_prop_key.items(): 

1724 if len(keys) > 1: 

1725 util.warn( 

1726 "On class %r, Column object %r named " 

1727 "directly multiple times, " 

1728 "only one will be used: %s. " 

1729 "Consider using orm.synonym instead" 

1730 % (self.classname, name, (", ".join(sorted(keys)))) 

1731 ) 

1732 

1733 def _setup_table(self, table: Optional[FromClause] = None) -> None: 

1734 cls = self.cls 

1735 cls_as_Decl = cast("MappedClassProtocol[Any]", cls) 

1736 

1737 tablename = self.tablename 

1738 table_args = self.table_args 

1739 clsdict_view = self.clsdict_view 

1740 declared_columns = self.declared_columns 

1741 column_ordering = self.column_ordering 

1742 

1743 manager = attributes.manager_of_class(cls) 

1744 

1745 if ( 

1746 self.table_fn is None 

1747 and "__table__" not in clsdict_view 

1748 and table is None 

1749 ): 

1750 if hasattr(cls, "__table_cls__"): 

1751 table_cls = cast( 

1752 Type[Table], 

1753 util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501 

1754 ) 

1755 else: 

1756 table_cls = Table 

1757 

1758 if tablename is not None: 

1759 args: Tuple[Any, ...] = () 

1760 table_kw: Dict[str, Any] = {} 

1761 

1762 if table_args: 

1763 if isinstance(table_args, dict): 

1764 table_kw = table_args 

1765 elif isinstance(table_args, tuple): 

1766 if isinstance(table_args[-1], dict): 

1767 args, table_kw = table_args[0:-1], table_args[-1] 

1768 else: 

1769 args = table_args 

1770 

1771 autoload_with = clsdict_view.get("__autoload_with__") 

1772 if autoload_with: 

1773 table_kw["autoload_with"] = autoload_with 

1774 

1775 autoload = clsdict_view.get("__autoload__") 

1776 if autoload: 

1777 table_kw["autoload"] = True 

1778 

1779 sorted_columns = sorted( 

1780 declared_columns, 

1781 key=lambda c: column_ordering.get(c, 0), 

1782 ) 

1783 table = self.set_cls_attribute( 

1784 "__table__", 

1785 table_cls( 

1786 tablename, 

1787 self._metadata_for_cls(manager), 

1788 *sorted_columns, 

1789 *args, 

1790 **table_kw, 

1791 ), 

1792 ) 

1793 else: 

1794 if table is None: 

1795 if self.table_fn: 

1796 table = self.set_cls_attribute( 

1797 "__table__", self.table_fn() 

1798 ) 

1799 else: 

1800 table = cls_as_Decl.__table__ 

1801 if declared_columns: 

1802 for c in declared_columns: 

1803 if not table.c.contains_column(c): 

1804 raise exc.ArgumentError( 

1805 "Can't add additional column %r when " 

1806 "specifying __table__" % c.key 

1807 ) 

1808 

1809 self.local_table = table 

1810 

1811 def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData: 

1812 meta: Optional[MetaData] = getattr(self.cls, "metadata", None) 

1813 if meta is not None: 

1814 return meta 

1815 else: 

1816 return manager.registry.metadata 

1817 

1818 def _setup_inheriting_mapper(self) -> None: 

1819 cls = self.cls 

1820 

1821 inherits = None 

1822 

1823 if inherits is None: 

1824 # since we search for classical mappings now, search for 

1825 # multiple mapped bases as well and raise an error. 

1826 inherits_search = [] 

1827 for base_ in cls.__bases__: 

1828 c = _resolve_for_abstract_or_classical(base_) 

1829 if c is None: 

1830 continue 

1831 

1832 if _is_supercls_for_inherits(c) and c not in inherits_search: 

1833 inherits_search.append(c) 

1834 

1835 if inherits_search: 

1836 if len(inherits_search) > 1: 

1837 raise exc.InvalidRequestError( 

1838 "Class %s has multiple mapped bases: %r" 

1839 % (cls, inherits_search) 

1840 ) 

1841 inherits = inherits_search[0] 

1842 elif isinstance(inherits, Mapper): 

1843 inherits = inherits.class_ 

1844 

1845 self.inherits = inherits 

1846 

1847 clsdict_view = self.clsdict_view 

1848 if "__table__" not in clsdict_view and self.tablename is None: 

1849 self.single = True 

1850 

1851 def _setup_inheriting_columns(self) -> None: 

1852 table = self.local_table 

1853 cls = self.cls 

1854 table_args = self.table_args 

1855 declared_columns = self.declared_columns 

1856 

1857 if ( 

1858 table is None 

1859 and self.inherits is None 

1860 and not _get_immediate_cls_attr(cls, "__no_table__") 

1861 ): 

1862 raise exc.InvalidRequestError( 

1863 "Class %r does not have a __table__ or __tablename__ " 

1864 "specified and does not inherit from an existing " 

1865 "table-mapped class." % cls 

1866 ) 

1867 elif self.inherits: 

1868 inherited_mapper_or_config = _declared_mapping_info(self.inherits) 

1869 assert inherited_mapper_or_config is not None 

1870 inherited_table = inherited_mapper_or_config.local_table 

1871 inherited_persist_selectable = ( 

1872 inherited_mapper_or_config.persist_selectable 

1873 ) 

1874 

1875 if table is None: 

1876 # single table inheritance. 

1877 # ensure no table args 

1878 if table_args: 

1879 raise exc.ArgumentError( 

1880 "Can't place __table_args__ on an inherited class " 

1881 "with no table." 

1882 ) 

1883 

1884 # add any columns declared here to the inherited table. 

1885 if declared_columns and not isinstance(inherited_table, Table): 

1886 raise exc.ArgumentError( 

1887 f"Can't declare columns on single-table-inherited " 

1888 f"subclass {self.cls}; superclass {self.inherits} " 

1889 "is not mapped to a Table" 

1890 ) 

1891 

1892 for col in declared_columns: 

1893 assert inherited_table is not None 

1894 if col.name in inherited_table.c: 

1895 if inherited_table.c[col.name] is col: 

1896 continue 

1897 raise exc.ArgumentError( 

1898 f"Column '{col}' on class {cls.__name__} " 

1899 f"conflicts with existing column " 

1900 f"'{inherited_table.c[col.name]}'. If using " 

1901 f"Declarative, consider using the " 

1902 "use_existing_column parameter of mapped_column() " 

1903 "to resolve conflicts." 

1904 ) 

1905 if col.primary_key: 

1906 raise exc.ArgumentError( 

1907 "Can't place primary key columns on an inherited " 

1908 "class with no table." 

1909 ) 

1910 

1911 if TYPE_CHECKING: 

1912 assert isinstance(inherited_table, Table) 

1913 

1914 inherited_table.append_column(col) 

1915 if ( 

1916 inherited_persist_selectable is not None 

1917 and inherited_persist_selectable is not inherited_table 

1918 ): 

1919 inherited_persist_selectable._refresh_for_new_column( 

1920 col 

1921 ) 

1922 

1923 def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None: 

1924 properties = self.properties 

1925 

1926 if self.mapper_args_fn: 

1927 mapper_args = self.mapper_args_fn() 

1928 else: 

1929 mapper_args = {} 

1930 

1931 if mapper_kw: 

1932 mapper_args.update(mapper_kw) 

1933 

1934 if "properties" in mapper_args: 

1935 properties = dict(properties) 

1936 properties.update(mapper_args["properties"]) 

1937 

1938 # make sure that column copies are used rather 

1939 # than the original columns from any mixins 

1940 for k in ("version_id_col", "polymorphic_on"): 

1941 if k in mapper_args: 

1942 v = mapper_args[k] 

1943 mapper_args[k] = self.column_copies.get(v, v) 

1944 

1945 if "primary_key" in mapper_args: 

1946 mapper_args["primary_key"] = [ 

1947 self.column_copies.get(v, v) 

1948 for v in util.to_list(mapper_args["primary_key"]) 

1949 ] 

1950 

1951 if "inherits" in mapper_args: 

1952 inherits_arg = mapper_args["inherits"] 

1953 if isinstance(inherits_arg, Mapper): 

1954 inherits_arg = inherits_arg.class_ 

1955 

1956 if inherits_arg is not self.inherits: 

1957 raise exc.InvalidRequestError( 

1958 "mapper inherits argument given for non-inheriting " 

1959 "class %s" % (mapper_args["inherits"]) 

1960 ) 

1961 

1962 if self.inherits: 

1963 mapper_args["inherits"] = self.inherits 

1964 

1965 if self.inherits and not mapper_args.get("concrete", False): 

1966 # note the superclass is expected to have a Mapper assigned and 

1967 # not be a deferred config, as this is called within map() 

1968 inherited_mapper = class_mapper(self.inherits, False) 

1969 inherited_table = inherited_mapper.local_table 

1970 

1971 # single or joined inheritance 

1972 # exclude any cols on the inherited table which are 

1973 # not mapped on the parent class, to avoid 

1974 # mapping columns specific to sibling/nephew classes 

1975 if "exclude_properties" not in mapper_args: 

1976 mapper_args["exclude_properties"] = exclude_properties = { 

1977 c.key 

1978 for c in inherited_table.c 

1979 if c not in inherited_mapper._columntoproperty 

1980 }.union(inherited_mapper.exclude_properties or ()) 

1981 exclude_properties.difference_update( 

1982 [c.key for c in self.declared_columns] 

1983 ) 

1984 

1985 # look through columns in the current mapper that 

1986 # are keyed to a propname different than the colname 

1987 # (if names were the same, we'd have popped it out above, 

1988 # in which case the mapper makes this combination). 

1989 # See if the superclass has a similar column property. 

1990 # If so, join them together. 

1991 for k, col in list(properties.items()): 

1992 if not isinstance(col, expression.ColumnElement): 

1993 continue 

1994 if k in inherited_mapper._props: 

1995 p = inherited_mapper._props[k] 

1996 if isinstance(p, ColumnProperty): 

1997 # note here we place the subclass column 

1998 # first. See [ticket:1892] for background. 

1999 properties[k] = [col] + p.columns 

2000 result_mapper_args = mapper_args.copy() 

2001 result_mapper_args["properties"] = properties 

2002 self.mapper_args = result_mapper_args 

2003 

2004 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

2005 self._prepare_mapper_arguments(mapper_kw) 

2006 if hasattr(self.cls, "__mapper_cls__"): 

2007 mapper_cls = cast( 

2008 "Type[Mapper[Any]]", 

2009 util.unbound_method_to_callable( 

2010 self.cls.__mapper_cls__ # type: ignore 

2011 ), 

2012 ) 

2013 else: 

2014 mapper_cls = Mapper 

2015 

2016 return self.set_cls_attribute( 

2017 "__mapper__", 

2018 mapper_cls(self.cls, self.local_table, **self.mapper_args), 

2019 ) 

2020 

2021 

2022class _UnmappedDataclassConfig(_ClassScanAbstractConfig): 

2023 """Configurator that will produce an unmapped dataclass.""" 

2024 

2025 __slots__ = ( 

2026 "clsdict_view", 

2027 "collected_attributes", 

2028 "collected_annotations", 

2029 "allow_dataclass_fields", 

2030 "dataclass_setup_arguments", 

2031 "is_dataclass_prior_to_mapping", 

2032 "allow_unmapped_annotations", 

2033 ) 

2034 

2035 def __init__( 

2036 self, 

2037 cls_: Type[_O], 

2038 dict_: _ClassDict, 

2039 ): 

2040 super().__init__(cls_) 

2041 self.clsdict_view = ( 

2042 util.immutabledict(dict_) if dict_ else util.EMPTY_DICT 

2043 ) 

2044 self.dataclass_setup_arguments = getattr( 

2045 self.cls, "_sa_apply_dc_transforms", None 

2046 ) 

2047 

2048 self.is_dataclass_prior_to_mapping = dataclasses.is_dataclass(cls_) 

2049 self.allow_dataclass_fields = False 

2050 self.allow_unmapped_annotations = True 

2051 self.collected_attributes = {} 

2052 self.collected_annotations = {} 

2053 

2054 self._scan_attributes() 

2055 

2056 self._setup_dataclasses_transforms( 

2057 enable_descriptor_defaults=False, revert=True 

2058 ) 

2059 

2060 def _scan_attributes(self) -> None: 

2061 cls = self.cls 

2062 

2063 clsdict_view = self.clsdict_view 

2064 collected_attributes = self.collected_attributes 

2065 _include_dunders = self._include_dunders 

2066 

2067 attribute_is_overridden = self._cls_attr_override_checker(self.cls) 

2068 

2069 local_attributes_for_class = self._cls_attr_resolver(cls) 

2070 for ( 

2071 name, 

2072 obj, 

2073 annotation, 

2074 is_dataclass_field, 

2075 ) in local_attributes_for_class(): 

2076 if name in _include_dunders: 

2077 continue 

2078 elif is_dataclass_field and ( 

2079 name not in clsdict_view or clsdict_view[name] is not obj 

2080 ): 

2081 # here, we are definitely looking at the target class 

2082 # and not a superclass. this is currently a 

2083 # dataclass-only path. if the name is only 

2084 # a dataclass field and isn't in local cls.__dict__, 

2085 # put the object there. 

2086 # assert that the dataclass-enabled resolver agrees 

2087 # with what we are seeing 

2088 

2089 assert not attribute_is_overridden(name, obj) 

2090 

2091 if _is_declarative_props(obj): 

2092 obj = obj.fget() 

2093 

2094 collected_attributes[name] = obj 

2095 self._collect_annotation(name, annotation, cls, False, obj) 

2096 else: 

2097 self._collect_annotation(name, annotation, cls, None, obj) 

2098 if name in clsdict_view: 

2099 collected_attributes[name] = obj 

2100 

2101 

2102@util.preload_module("sqlalchemy.orm.decl_api") 

2103def _as_dc_declaredattr( 

2104 field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str 

2105) -> Any: 

2106 # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr. 

2107 # we can't write it because field.metadata is immutable :( so we have 

2108 # to go through extra trouble to compare these 

2109 decl_api = util.preloaded.orm_decl_api 

2110 obj = field_metadata[sa_dataclass_metadata_key] 

2111 if callable(obj) and not isinstance(obj, decl_api.declared_attr): 

2112 return decl_api.declared_attr(obj) 

2113 else: 

2114 return obj 

2115 

2116 

2117class _DeferredDeclarativeConfig(_DeclarativeMapperConfig): 

2118 """Configurator that extends _DeclarativeMapperConfig to add a 

2119 "deferred" step, to allow extensions like AbstractConcreteBase, 

2120 DeferredMapping to partially set up a mapping that is "prepared" 

2121 when table metadata is ready. 

2122 

2123 """ 

2124 

2125 _cls: weakref.ref[Type[Any]] 

2126 

2127 is_deferred = True 

2128 

2129 _configs: util.OrderedDict[ 

2130 weakref.ref[Type[Any]], _DeferredDeclarativeConfig 

2131 ] = util.OrderedDict() 

2132 

2133 def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None: 

2134 pass 

2135 

2136 @property 

2137 def cls(self) -> Type[Any]: 

2138 return self._cls() # type: ignore 

2139 

2140 @cls.setter 

2141 def cls(self, class_: Type[Any]) -> None: 

2142 self._cls = weakref.ref(class_, self._remove_config_cls) 

2143 self._configs[self._cls] = self 

2144 

2145 @classmethod 

2146 def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None: 

2147 cls._configs.pop(ref, None) 

2148 

2149 @classmethod 

2150 def has_cls(cls, class_: Type[Any]) -> bool: 

2151 # 2.6 fails on weakref if class_ is an old style class 

2152 return isinstance(class_, type) and weakref.ref(class_) in cls._configs 

2153 

2154 @classmethod 

2155 def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn: 

2156 if hasattr(class_, "_sa_raise_deferred_config"): 

2157 class_._sa_raise_deferred_config() 

2158 

2159 raise orm_exc.UnmappedClassError( 

2160 class_, 

2161 msg=( 

2162 f"Class {orm_exc._safe_cls_name(class_)} has a deferred " 

2163 "mapping on it. It is not yet usable as a mapped class." 

2164 ), 

2165 ) 

2166 

2167 @classmethod 

2168 def config_for_cls(cls, class_: Type[Any]) -> _DeferredDeclarativeConfig: 

2169 return cls._configs[weakref.ref(class_)] 

2170 

2171 @classmethod 

2172 def classes_for_base( 

2173 cls, base_cls: Type[Any], sort: bool = True 

2174 ) -> List[_DeferredDeclarativeConfig]: 

2175 classes_for_base = [ 

2176 m 

2177 for m, cls_ in [(m, m.cls) for m in cls._configs.values()] 

2178 if cls_ is not None and issubclass(cls_, base_cls) 

2179 ] 

2180 

2181 if not sort: 

2182 return classes_for_base 

2183 

2184 all_m_by_cls = {m.cls: m for m in classes_for_base} 

2185 

2186 tuples: List[ 

2187 Tuple[_DeferredDeclarativeConfig, _DeferredDeclarativeConfig] 

2188 ] = [] 

2189 for m_cls in all_m_by_cls: 

2190 tuples.extend( 

2191 (all_m_by_cls[base_cls], all_m_by_cls[m_cls]) 

2192 for base_cls in m_cls.__bases__ 

2193 if base_cls in all_m_by_cls 

2194 ) 

2195 return list(topological.sort(tuples, classes_for_base)) 

2196 

2197 def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]: 

2198 self._configs.pop(self._cls, None) 

2199 return super().map(mapper_kw) 

2200 

2201 

2202def _add_attribute( 

2203 cls: Type[Any], key: str, value: MapperProperty[Any] 

2204) -> None: 

2205 """add an attribute to an existing declarative class. 

2206 

2207 This runs through the logic to determine MapperProperty, 

2208 adds it to the Mapper, adds a column to the mapped Table, etc. 

2209 

2210 """ 

2211 

2212 if "__mapper__" in cls.__dict__: 

2213 mapped_cls = cast("MappedClassProtocol[Any]", cls) 

2214 

2215 def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table: 

2216 if isinstance(mc.__table__, Table): 

2217 return mc.__table__ 

2218 raise exc.InvalidRequestError( 

2219 f"Cannot add a new attribute to mapped class {mc.__name__!r} " 

2220 "because it's not mapped against a table." 

2221 ) 

2222 

2223 if isinstance(value, Column): 

2224 _undefer_column_name(key, value) 

2225 _table_or_raise(mapped_cls).append_column( 

2226 value, replace_existing=True 

2227 ) 

2228 mapped_cls.__mapper__.add_property(key, value) 

2229 elif isinstance(value, _MapsColumns): 

2230 mp = value.mapper_property_to_assign 

2231 for col, _ in value.columns_to_assign: 

2232 _undefer_column_name(key, col) 

2233 _table_or_raise(mapped_cls).append_column( 

2234 col, replace_existing=True 

2235 ) 

2236 if not mp: 

2237 mapped_cls.__mapper__.add_property(key, col) 

2238 if mp: 

2239 mapped_cls.__mapper__.add_property(key, mp) 

2240 elif isinstance(value, MapperProperty): 

2241 mapped_cls.__mapper__.add_property(key, value) 

2242 elif isinstance(value, QueryableAttribute) and value.key != key: 

2243 # detect a QueryableAttribute that's already mapped being 

2244 # assigned elsewhere in userland, turn into a synonym() 

2245 value = SynonymProperty(value.key) 

2246 mapped_cls.__mapper__.add_property(key, value) 

2247 else: 

2248 type.__setattr__(cls, key, value) 

2249 mapped_cls.__mapper__._expire_memoizations() 

2250 else: 

2251 type.__setattr__(cls, key, value) 

2252 

2253 

2254def _del_attribute(cls: Type[Any], key: str) -> None: 

2255 if ( 

2256 "__mapper__" in cls.__dict__ 

2257 and key in cls.__dict__ 

2258 and not cast( 

2259 "MappedClassProtocol[Any]", cls 

2260 ).__mapper__._dispose_called 

2261 ): 

2262 value = cls.__dict__[key] 

2263 if isinstance( 

2264 value, (Column, _MapsColumns, MapperProperty, QueryableAttribute) 

2265 ): 

2266 raise NotImplementedError( 

2267 "Can't un-map individual mapped attributes on a mapped class." 

2268 ) 

2269 else: 

2270 type.__delattr__(cls, key) 

2271 cast( 

2272 "MappedClassProtocol[Any]", cls 

2273 ).__mapper__._expire_memoizations() 

2274 else: 

2275 type.__delattr__(cls, key) 

2276 

2277 

2278def _declarative_constructor(self: Any, **kwargs: Any) -> None: 

2279 """A simple constructor that allows initialization from kwargs. 

2280 

2281 Sets attributes on the constructed instance using the names and 

2282 values in ``kwargs``. 

2283 

2284 Only keys that are present as 

2285 attributes of the instance's class are allowed. These could be, 

2286 for example, any mapped columns or relationships. 

2287 """ 

2288 cls_ = type(self) 

2289 for k in kwargs: 

2290 if not hasattr(cls_, k): 

2291 raise TypeError( 

2292 "%r is an invalid keyword argument for %s" % (k, cls_.__name__) 

2293 ) 

2294 setattr(self, k, kwargs[k]) 

2295 

2296 

2297_declarative_constructor.__name__ = "__init__" 

2298 

2299 

2300def _undefer_column_name(key: str, column: Column[Any]) -> None: 

2301 if column.key is None: 

2302 column.key = key 

2303 if column.name is None: 

2304 column.name = key