Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/descriptor_props.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

504 statements  

1# orm/descriptor_props.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Descriptor properties are more "auxiliary" properties 

9that exist as configurational elements, but don't participate 

10as actively in the load/persist ORM loop. 

11 

12""" 

13from __future__ import annotations 

14 

15from dataclasses import is_dataclass 

16import inspect 

17import itertools 

18import operator 

19import typing 

20from typing import Any 

21from typing import Callable 

22from typing import Dict 

23from typing import List 

24from typing import NoReturn 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import Type 

29from typing import TYPE_CHECKING 

30from typing import TypeVar 

31from typing import Union 

32import weakref 

33 

34from . import attributes 

35from . import util as orm_util 

36from .base import _DeclarativeMapped 

37from .base import DONT_SET 

38from .base import LoaderCallableStatus 

39from .base import Mapped 

40from .base import PassiveFlag 

41from .base import SQLORMOperations 

42from .interfaces import _AttributeOptions 

43from .interfaces import _IntrospectsAnnotations 

44from .interfaces import _MapsColumns 

45from .interfaces import MapperProperty 

46from .interfaces import PropComparator 

47from .util import _none_set 

48from .util import de_stringify_annotation 

49from .. import event 

50from .. import exc as sa_exc 

51from .. import schema 

52from .. import sql 

53from .. import util 

54from ..sql import expression 

55from ..sql import operators 

56from ..sql.base import _NoArg 

57from ..sql.elements import BindParameter 

58from ..util.typing import get_args 

59from ..util.typing import is_fwd_ref 

60from ..util.typing import is_pep593 

61from ..util.typing import TupleAny 

62from ..util.typing import Unpack 

63 

64 

65if typing.TYPE_CHECKING: 

66 from ._typing import _InstanceDict 

67 from ._typing import _RegistryType 

68 from .attributes import History 

69 from .attributes import InstrumentedAttribute 

70 from .attributes import QueryableAttribute 

71 from .context import _ORMCompileState 

72 from .decl_base import _ClassScanMapperConfig 

73 from .interfaces import _DataclassArguments 

74 from .mapper import Mapper 

75 from .properties import ColumnProperty 

76 from .properties import MappedColumn 

77 from .state import InstanceState 

78 from ..engine.base import Connection 

79 from ..engine.row import Row 

80 from ..sql._typing import _DMLColumnArgument 

81 from ..sql._typing import _InfoType 

82 from ..sql.elements import ClauseList 

83 from ..sql.elements import ColumnElement 

84 from ..sql.operators import OperatorType 

85 from ..sql.schema import Column 

86 from ..sql.selectable import Select 

87 from ..util.typing import _AnnotationScanType 

88 from ..util.typing import CallableReference 

89 from ..util.typing import DescriptorReference 

90 from ..util.typing import RODescriptorReference 

91 

92_T = TypeVar("_T", bound=Any) 

93_PT = TypeVar("_PT", bound=Any) 

94 

95 

96class DescriptorProperty(MapperProperty[_T]): 

97 """:class:`.MapperProperty` which proxies access to a 

98 user-defined descriptor.""" 

99 

100 doc: Optional[str] = None 

101 

102 uses_objects = False 

103 _links_to_entity = False 

104 

105 descriptor: DescriptorReference[Any] 

106 

107 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

108 raise NotImplementedError( 

109 "This MapperProperty does not implement column loader strategies" 

110 ) 

111 

112 def get_history( 

113 self, 

114 state: InstanceState[Any], 

115 dict_: _InstanceDict, 

116 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

117 ) -> History: 

118 raise NotImplementedError() 

119 

120 def instrument_class(self, mapper: Mapper[Any]) -> None: 

121 prop = self 

122 

123 class _ProxyImpl(attributes._AttributeImpl): 

124 accepts_scalar_loader = False 

125 load_on_unexpire = True 

126 collection = False 

127 

128 @property 

129 def uses_objects(self) -> bool: # type: ignore 

130 return prop.uses_objects 

131 

132 def __init__(self, key: str): 

133 self.key = key 

134 

135 def get_history( 

136 self, 

137 state: InstanceState[Any], 

138 dict_: _InstanceDict, 

139 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

140 ) -> History: 

141 return prop.get_history(state, dict_, passive) 

142 

143 if self.descriptor is None: 

144 desc = getattr(mapper.class_, self.key, None) 

145 if mapper._is_userland_descriptor(self.key, desc): 

146 self.descriptor = desc 

147 

148 if self.descriptor is None: 

149 

150 def fset(obj: Any, value: Any) -> None: 

151 setattr(obj, self.name, value) 

152 

153 def fdel(obj: Any) -> None: 

154 delattr(obj, self.name) 

155 

156 def fget(obj: Any) -> Any: 

157 return getattr(obj, self.name) 

158 

159 self.descriptor = property(fget=fget, fset=fset, fdel=fdel) 

160 

161 proxy_attr = attributes._create_proxied_attribute(self.descriptor)( 

162 self.parent.class_, 

163 self.key, 

164 self.descriptor, 

165 lambda: self._comparator_factory(mapper), 

166 doc=self.doc, 

167 original_property=self, 

168 ) 

169 

170 proxy_attr.impl = _ProxyImpl(self.key) 

171 mapper.class_manager.instrument_attribute(self.key, proxy_attr) 

172 

173 

174_CompositeAttrType = Union[ 

175 str, 

176 "Column[_T]", 

177 "MappedColumn[_T]", 

178 "InstrumentedAttribute[_T]", 

179 "Mapped[_T]", 

180] 

181 

182 

183_CC = TypeVar("_CC", bound=Any) 

184 

185 

186_composite_getters: weakref.WeakKeyDictionary[ 

187 Type[Any], Callable[[Any], Tuple[Any, ...]] 

188] = weakref.WeakKeyDictionary() 

189 

190 

191class CompositeProperty( 

192 _MapsColumns[_CC], _IntrospectsAnnotations, DescriptorProperty[_CC] 

193): 

194 """Defines a "composite" mapped attribute, representing a collection 

195 of columns as one attribute. 

196 

197 :class:`.CompositeProperty` is constructed using the :func:`.composite` 

198 function. 

199 

200 .. seealso:: 

201 

202 :ref:`mapper_composite` 

203 

204 """ 

205 

206 composite_class: Union[Type[_CC], Callable[..., _CC]] 

207 attrs: Tuple[_CompositeAttrType[Any], ...] 

208 

209 _generated_composite_accessor: CallableReference[ 

210 Optional[Callable[[_CC], Tuple[Any, ...]]] 

211 ] 

212 

213 comparator_factory: Type[Comparator[_CC]] 

214 

215 def __init__( 

216 self, 

217 _class_or_attr: Union[ 

218 None, Type[_CC], Callable[..., _CC], _CompositeAttrType[Any] 

219 ] = None, 

220 *attrs: _CompositeAttrType[Any], 

221 attribute_options: Optional[_AttributeOptions] = None, 

222 active_history: bool = False, 

223 deferred: bool = False, 

224 group: Optional[str] = None, 

225 comparator_factory: Optional[Type[Comparator[_CC]]] = None, 

226 info: Optional[_InfoType] = None, 

227 **kwargs: Any, 

228 ): 

229 super().__init__(attribute_options=attribute_options) 

230 

231 if isinstance(_class_or_attr, (Mapped, str, sql.ColumnElement)): 

232 self.attrs = (_class_or_attr,) + attrs 

233 # will initialize within declarative_scan 

234 self.composite_class = None # type: ignore 

235 else: 

236 self.composite_class = _class_or_attr # type: ignore 

237 self.attrs = attrs 

238 

239 self.active_history = active_history 

240 self.deferred = deferred 

241 self.group = group 

242 self.comparator_factory = ( 

243 comparator_factory 

244 if comparator_factory is not None 

245 else self.__class__.Comparator 

246 ) 

247 self._generated_composite_accessor = None 

248 if info is not None: 

249 self.info.update(info) 

250 

251 util.set_creation_order(self) 

252 self._create_descriptor() 

253 self._init_accessor() 

254 

255 def instrument_class(self, mapper: Mapper[Any]) -> None: 

256 super().instrument_class(mapper) 

257 self._setup_event_handlers() 

258 

259 def _composite_values_from_instance(self, value: _CC) -> Tuple[Any, ...]: 

260 if self._generated_composite_accessor: 

261 return self._generated_composite_accessor(value) 

262 else: 

263 try: 

264 accessor = value.__composite_values__ 

265 except AttributeError as ae: 

266 raise sa_exc.InvalidRequestError( 

267 f"Composite class {self.composite_class.__name__} is not " 

268 f"a dataclass and does not define a __composite_values__()" 

269 " method; can't get state" 

270 ) from ae 

271 else: 

272 return accessor() # type: ignore 

273 

274 def do_init(self) -> None: 

275 """Initialization which occurs after the :class:`.Composite` 

276 has been associated with its parent mapper. 

277 

278 """ 

279 self._setup_arguments_on_columns() 

280 

281 _COMPOSITE_FGET = object() 

282 

283 def _create_descriptor(self) -> None: 

284 """Create the Python descriptor that will serve as 

285 the access point on instances of the mapped class. 

286 

287 """ 

288 

289 def fget(instance: Any) -> Any: 

290 dict_ = attributes.instance_dict(instance) 

291 state = attributes.instance_state(instance) 

292 

293 if self.key not in dict_: 

294 # key not present. Iterate through related 

295 # attributes, retrieve their values. This 

296 # ensures they all load. 

297 values = [ 

298 getattr(instance, key) for key in self._attribute_keys 

299 ] 

300 

301 # current expected behavior here is that the composite is 

302 # created on access if the object is persistent or if 

303 # col attributes have non-None. This would be better 

304 # if the composite were created unconditionally, 

305 # but that would be a behavioral change. 

306 if self.key not in dict_ and ( 

307 state.key is not None or not _none_set.issuperset(values) 

308 ): 

309 dict_[self.key] = self.composite_class(*values) 

310 state.manager.dispatch.refresh( 

311 state, self._COMPOSITE_FGET, [self.key] 

312 ) 

313 

314 return dict_.get(self.key, None) 

315 

316 def fset(instance: Any, value: Any) -> None: 

317 if value is LoaderCallableStatus.DONT_SET: 

318 return 

319 

320 dict_ = attributes.instance_dict(instance) 

321 state = attributes.instance_state(instance) 

322 attr = state.manager[self.key] 

323 

324 if attr.dispatch._active_history: 

325 previous = fget(instance) 

326 else: 

327 previous = dict_.get(self.key, LoaderCallableStatus.NO_VALUE) 

328 

329 for fn in attr.dispatch.set: 

330 value = fn(state, value, previous, attr.impl) 

331 dict_[self.key] = value 

332 if value is None: 

333 for key in self._attribute_keys: 

334 setattr(instance, key, None) 

335 else: 

336 for key, value in zip( 

337 self._attribute_keys, 

338 self._composite_values_from_instance(value), 

339 ): 

340 setattr(instance, key, value) 

341 

342 def fdel(instance: Any) -> None: 

343 state = attributes.instance_state(instance) 

344 dict_ = attributes.instance_dict(instance) 

345 attr = state.manager[self.key] 

346 

347 if attr.dispatch._active_history: 

348 previous = fget(instance) 

349 dict_.pop(self.key, None) 

350 else: 

351 previous = dict_.pop(self.key, LoaderCallableStatus.NO_VALUE) 

352 

353 attr = state.manager[self.key] 

354 attr.dispatch.remove(state, previous, attr.impl) 

355 for key in self._attribute_keys: 

356 setattr(instance, key, None) 

357 

358 self.descriptor = property(fget, fset, fdel) 

359 

360 @util.preload_module("sqlalchemy.orm.properties") 

361 def declarative_scan( 

362 self, 

363 decl_scan: _ClassScanMapperConfig, 

364 registry: _RegistryType, 

365 cls: Type[Any], 

366 originating_module: Optional[str], 

367 key: str, 

368 mapped_container: Optional[Type[Mapped[Any]]], 

369 annotation: Optional[_AnnotationScanType], 

370 extracted_mapped_annotation: Optional[_AnnotationScanType], 

371 is_dataclass_field: bool, 

372 ) -> None: 

373 MappedColumn = util.preloaded.orm_properties.MappedColumn 

374 if ( 

375 self.composite_class is None 

376 and extracted_mapped_annotation is None 

377 ): 

378 self._raise_for_required(key, cls) 

379 argument = extracted_mapped_annotation 

380 

381 if is_pep593(argument): 

382 argument = get_args(argument)[0] 

383 

384 if argument and self.composite_class is None: 

385 if isinstance(argument, str) or is_fwd_ref( 

386 argument, check_generic=True 

387 ): 

388 if originating_module is None: 

389 str_arg = ( 

390 argument.__forward_arg__ 

391 if hasattr(argument, "__forward_arg__") 

392 else str(argument) 

393 ) 

394 raise sa_exc.ArgumentError( 

395 f"Can't use forward ref {argument} for composite " 

396 f"class argument; set up the type as Mapped[{str_arg}]" 

397 ) 

398 argument = de_stringify_annotation( 

399 cls, argument, originating_module, include_generic=True 

400 ) 

401 

402 self.composite_class = argument 

403 

404 if is_dataclass(self.composite_class): 

405 self._setup_for_dataclass( 

406 decl_scan, registry, cls, originating_module, key 

407 ) 

408 else: 

409 for attr in self.attrs: 

410 if ( 

411 isinstance(attr, (MappedColumn, schema.Column)) 

412 and attr.name is None 

413 ): 

414 raise sa_exc.ArgumentError( 

415 "Composite class column arguments must be named " 

416 "unless a dataclass is used" 

417 ) 

418 self._init_accessor() 

419 

420 def _init_accessor(self) -> None: 

421 if is_dataclass(self.composite_class) and not hasattr( 

422 self.composite_class, "__composite_values__" 

423 ): 

424 insp = inspect.signature(self.composite_class) 

425 getter = operator.attrgetter( 

426 *[p.name for p in insp.parameters.values()] 

427 ) 

428 if len(insp.parameters) == 1: 

429 self._generated_composite_accessor = lambda obj: (getter(obj),) 

430 else: 

431 self._generated_composite_accessor = getter 

432 

433 if ( 

434 self.composite_class is not None 

435 and isinstance(self.composite_class, type) 

436 and self.composite_class not in _composite_getters 

437 ): 

438 if self._generated_composite_accessor is not None: 

439 _composite_getters[self.composite_class] = ( 

440 self._generated_composite_accessor 

441 ) 

442 elif hasattr(self.composite_class, "__composite_values__"): 

443 _composite_getters[self.composite_class] = ( 

444 lambda obj: obj.__composite_values__() 

445 ) 

446 

447 @util.preload_module("sqlalchemy.orm.properties") 

448 @util.preload_module("sqlalchemy.orm.decl_base") 

449 def _setup_for_dataclass( 

450 self, 

451 decl_scan: _ClassScanMapperConfig, 

452 registry: _RegistryType, 

453 cls: Type[Any], 

454 originating_module: Optional[str], 

455 key: str, 

456 ) -> None: 

457 MappedColumn = util.preloaded.orm_properties.MappedColumn 

458 

459 decl_base = util.preloaded.orm_decl_base 

460 

461 insp = inspect.signature(self.composite_class) 

462 for param, attr in itertools.zip_longest( 

463 insp.parameters.values(), self.attrs 

464 ): 

465 if param is None: 

466 raise sa_exc.ArgumentError( 

467 f"number of composite attributes " 

468 f"{len(self.attrs)} exceeds " 

469 f"that of the number of attributes in class " 

470 f"{self.composite_class.__name__} {len(insp.parameters)}" 

471 ) 

472 if attr is None: 

473 # fill in missing attr spots with empty MappedColumn 

474 attr = MappedColumn() 

475 self.attrs += (attr,) 

476 

477 if isinstance(attr, MappedColumn): 

478 attr.declarative_scan_for_composite( 

479 decl_scan, 

480 registry, 

481 cls, 

482 originating_module, 

483 key, 

484 param.name, 

485 param.annotation, 

486 ) 

487 elif isinstance(attr, schema.Column): 

488 decl_base._undefer_column_name(param.name, attr) 

489 

490 @util.memoized_property 

491 def _comparable_elements(self) -> Sequence[QueryableAttribute[Any]]: 

492 return [getattr(self.parent.class_, prop.key) for prop in self.props] 

493 

494 @util.memoized_property 

495 @util.preload_module("orm.properties") 

496 def props(self) -> Sequence[MapperProperty[Any]]: 

497 props = [] 

498 MappedColumn = util.preloaded.orm_properties.MappedColumn 

499 

500 for attr in self.attrs: 

501 if isinstance(attr, str): 

502 prop = self.parent.get_property(attr, _configure_mappers=False) 

503 elif isinstance(attr, schema.Column): 

504 prop = self.parent._columntoproperty[attr] 

505 elif isinstance(attr, MappedColumn): 

506 prop = self.parent._columntoproperty[attr.column] 

507 elif isinstance(attr, attributes.InstrumentedAttribute): 

508 prop = attr.property 

509 else: 

510 prop = None 

511 

512 if not isinstance(prop, MapperProperty): 

513 raise sa_exc.ArgumentError( 

514 "Composite expects Column objects or mapped " 

515 f"attributes/attribute names as arguments, got: {attr!r}" 

516 ) 

517 

518 props.append(prop) 

519 return props 

520 

521 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

522 return self._comparable_elements 

523 

524 @util.non_memoized_property 

525 @util.preload_module("orm.properties") 

526 def columns(self) -> Sequence[Column[Any]]: 

527 MappedColumn = util.preloaded.orm_properties.MappedColumn 

528 return [ 

529 a.column if isinstance(a, MappedColumn) else a 

530 for a in self.attrs 

531 if isinstance(a, (schema.Column, MappedColumn)) 

532 ] 

533 

534 @property 

535 def mapper_property_to_assign(self) -> Optional[MapperProperty[_CC]]: 

536 return self 

537 

538 @property 

539 def columns_to_assign(self) -> List[Tuple[schema.Column[Any], int]]: 

540 return [(c, 0) for c in self.columns if c.table is None] 

541 

542 @util.preload_module("orm.properties") 

543 def _setup_arguments_on_columns(self) -> None: 

544 """Propagate configuration arguments made on this composite 

545 to the target columns, for those that apply. 

546 

547 """ 

548 ColumnProperty = util.preloaded.orm_properties.ColumnProperty 

549 

550 for prop in self.props: 

551 if not isinstance(prop, ColumnProperty): 

552 continue 

553 else: 

554 cprop = prop 

555 

556 cprop.active_history = self.active_history 

557 if self.deferred: 

558 cprop.deferred = self.deferred 

559 cprop.strategy_key = (("deferred", True), ("instrument", True)) 

560 cprop.group = self.group 

561 

562 def _setup_event_handlers(self) -> None: 

563 """Establish events that populate/expire the composite attribute.""" 

564 

565 def load_handler( 

566 state: InstanceState[Any], context: _ORMCompileState 

567 ) -> None: 

568 _load_refresh_handler(state, context, None, is_refresh=False) 

569 

570 def refresh_handler( 

571 state: InstanceState[Any], 

572 context: _ORMCompileState, 

573 to_load: Optional[Sequence[str]], 

574 ) -> None: 

575 # note this corresponds to sqlalchemy.ext.mutable load_attrs() 

576 

577 if not to_load or ( 

578 {self.key}.union(self._attribute_keys) 

579 ).intersection(to_load): 

580 _load_refresh_handler(state, context, to_load, is_refresh=True) 

581 

582 def _load_refresh_handler( 

583 state: InstanceState[Any], 

584 context: _ORMCompileState, 

585 to_load: Optional[Sequence[str]], 

586 is_refresh: bool, 

587 ) -> None: 

588 dict_ = state.dict 

589 

590 # if context indicates we are coming from the 

591 # fget() handler, this already set the value; skip the 

592 # handler here. (other handlers like mutablecomposite will still 

593 # want to catch it) 

594 # there's an insufficiency here in that the fget() handler 

595 # really should not be using the refresh event and there should 

596 # be some other event that mutablecomposite can subscribe 

597 # towards for this. 

598 

599 if ( 

600 not is_refresh or context is self._COMPOSITE_FGET 

601 ) and self.key in dict_: 

602 return 

603 

604 # if column elements aren't loaded, skip. 

605 # __get__() will initiate a load for those 

606 # columns 

607 for k in self._attribute_keys: 

608 if k not in dict_: 

609 return 

610 

611 dict_[self.key] = self.composite_class( 

612 *[state.dict[key] for key in self._attribute_keys] 

613 ) 

614 

615 def expire_handler( 

616 state: InstanceState[Any], keys: Optional[Sequence[str]] 

617 ) -> None: 

618 if keys is None or set(self._attribute_keys).intersection(keys): 

619 state.dict.pop(self.key, None) 

620 

621 def insert_update_handler( 

622 mapper: Mapper[Any], 

623 connection: Connection, 

624 state: InstanceState[Any], 

625 ) -> None: 

626 """After an insert or update, some columns may be expired due 

627 to server side defaults, or re-populated due to client side 

628 defaults. Pop out the composite value here so that it 

629 recreates. 

630 

631 """ 

632 

633 state.dict.pop(self.key, None) 

634 

635 event.listen( 

636 self.parent, "after_insert", insert_update_handler, raw=True 

637 ) 

638 event.listen( 

639 self.parent, "after_update", insert_update_handler, raw=True 

640 ) 

641 event.listen( 

642 self.parent, "load", load_handler, raw=True, propagate=True 

643 ) 

644 event.listen( 

645 self.parent, "refresh", refresh_handler, raw=True, propagate=True 

646 ) 

647 event.listen( 

648 self.parent, "expire", expire_handler, raw=True, propagate=True 

649 ) 

650 

651 proxy_attr = self.parent.class_manager[self.key] 

652 proxy_attr.impl.dispatch = proxy_attr.dispatch # type: ignore 

653 proxy_attr.impl.dispatch._active_history = self.active_history 

654 

655 # TODO: need a deserialize hook here 

656 

657 @util.memoized_property 

658 def _attribute_keys(self) -> Sequence[str]: 

659 return [prop.key for prop in self.props] 

660 

661 def _populate_composite_bulk_save_mappings_fn( 

662 self, 

663 ) -> Callable[[Dict[str, Any]], None]: 

664 if self._generated_composite_accessor: 

665 get_values = self._generated_composite_accessor 

666 else: 

667 

668 def get_values(val: Any) -> Tuple[Any]: 

669 return val.__composite_values__() # type: ignore 

670 

671 attrs = [prop.key for prop in self.props] 

672 

673 def populate(dest_dict: Dict[str, Any]) -> None: 

674 dest_dict.update( 

675 { 

676 key: val 

677 for key, val in zip( 

678 attrs, get_values(dest_dict.pop(self.key)) 

679 ) 

680 } 

681 ) 

682 

683 return populate 

684 

685 def get_history( 

686 self, 

687 state: InstanceState[Any], 

688 dict_: _InstanceDict, 

689 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

690 ) -> History: 

691 """Provided for userland code that uses attributes.get_history().""" 

692 

693 added: List[Any] = [] 

694 deleted: List[Any] = [] 

695 

696 has_history = False 

697 for prop in self.props: 

698 key = prop.key 

699 hist = state.manager[key].impl.get_history(state, dict_) 

700 if hist.has_changes(): 

701 has_history = True 

702 

703 non_deleted = hist.non_deleted() 

704 if non_deleted: 

705 added.extend(non_deleted) 

706 else: 

707 added.append(None) 

708 if hist.deleted: 

709 deleted.extend(hist.deleted) 

710 else: 

711 deleted.append(None) 

712 

713 if has_history: 

714 return attributes.History( 

715 [self.composite_class(*added)], 

716 (), 

717 [self.composite_class(*deleted)], 

718 ) 

719 else: 

720 return attributes.History((), [self.composite_class(*added)], ()) 

721 

722 def _comparator_factory( 

723 self, mapper: Mapper[Any] 

724 ) -> Composite.Comparator[_CC]: 

725 return self.comparator_factory(self, mapper) 

726 

727 class CompositeBundle(orm_util.Bundle[_T]): 

728 def __init__( 

729 self, 

730 property_: Composite[_T], 

731 expr: ClauseList, 

732 ): 

733 self.property = property_ 

734 super().__init__(property_.key, *expr) 

735 

736 def create_row_processor( 

737 self, 

738 query: Select[Unpack[TupleAny]], 

739 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

740 labels: Sequence[str], 

741 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

742 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

743 return self.property.composite_class( 

744 *[proc(row) for proc in procs] 

745 ) 

746 

747 return proc 

748 

749 class Comparator(PropComparator[_PT]): 

750 """Produce boolean, comparison, and other operators for 

751 :class:`.Composite` attributes. 

752 

753 See the example in :ref:`composite_operations` for an overview 

754 of usage , as well as the documentation for :class:`.PropComparator`. 

755 

756 .. seealso:: 

757 

758 :class:`.PropComparator` 

759 

760 :class:`.ColumnOperators` 

761 

762 :ref:`types_operators` 

763 

764 :attr:`.TypeEngine.comparator_factory` 

765 

766 """ 

767 

768 # https://github.com/python/mypy/issues/4266 

769 __hash__ = None # type: ignore 

770 

771 prop: RODescriptorReference[Composite[_PT]] 

772 

773 @util.memoized_property 

774 def clauses(self) -> ClauseList: 

775 return expression.ClauseList( 

776 group=False, *self._comparable_elements 

777 ) 

778 

779 def __clause_element__(self) -> CompositeProperty.CompositeBundle[_PT]: 

780 return self.expression 

781 

782 @util.memoized_property 

783 def expression(self) -> CompositeProperty.CompositeBundle[_PT]: 

784 clauses = self.clauses._annotate( 

785 { 

786 "parententity": self._parententity, 

787 "parentmapper": self._parententity, 

788 "proxy_key": self.prop.key, 

789 } 

790 ) 

791 return CompositeProperty.CompositeBundle(self.prop, clauses) 

792 

793 def _bulk_update_tuples( 

794 self, value: Any 

795 ) -> Sequence[Tuple[_DMLColumnArgument, Any]]: 

796 if isinstance(value, BindParameter): 

797 value = value.value 

798 

799 values: Sequence[Any] 

800 

801 if value is None: 

802 values = [None for key in self.prop._attribute_keys] 

803 elif isinstance(self.prop.composite_class, type) and isinstance( 

804 value, self.prop.composite_class 

805 ): 

806 values = self.prop._composite_values_from_instance( 

807 value # type: ignore[arg-type] 

808 ) 

809 else: 

810 raise sa_exc.ArgumentError( 

811 "Can't UPDATE composite attribute %s to %r" 

812 % (self.prop, value) 

813 ) 

814 

815 return list(zip(self._comparable_elements, values)) 

816 

817 def _bulk_dml_setter(self, key: str) -> Optional[Callable[..., Any]]: 

818 return self.prop._populate_composite_bulk_save_mappings_fn() 

819 

820 @util.memoized_property 

821 def _comparable_elements(self) -> Sequence[QueryableAttribute[Any]]: 

822 if self._adapt_to_entity: 

823 return [ 

824 getattr(self._adapt_to_entity.entity, prop.key) 

825 for prop in self.prop._comparable_elements 

826 ] 

827 else: 

828 return self.prop._comparable_elements 

829 

830 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501 

831 return self._compare(operators.eq, other) 

832 

833 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501 

834 return self._compare(operators.ne, other) 

835 

836 def __lt__(self, other: Any) -> ColumnElement[bool]: 

837 return self._compare(operators.lt, other) 

838 

839 def __gt__(self, other: Any) -> ColumnElement[bool]: 

840 return self._compare(operators.gt, other) 

841 

842 def __le__(self, other: Any) -> ColumnElement[bool]: 

843 return self._compare(operators.le, other) 

844 

845 def __ge__(self, other: Any) -> ColumnElement[bool]: 

846 return self._compare(operators.ge, other) 

847 

848 # what might be interesting would be if we create 

849 # an instance of the composite class itself with 

850 # the columns as data members, then use "hybrid style" comparison 

851 # to create these comparisons. then your Point.__eq__() method could 

852 # be where comparison behavior is defined for SQL also. Likely 

853 # not a good choice for default behavior though, not clear how it would 

854 # work w/ dataclasses, etc. also no demand for any of this anyway. 

855 def _compare( 

856 self, operator: OperatorType, other: Any 

857 ) -> ColumnElement[bool]: 

858 values: Sequence[Any] 

859 if other is None: 

860 values = [None] * len(self.prop._comparable_elements) 

861 else: 

862 values = self.prop._composite_values_from_instance(other) 

863 comparisons = [ 

864 operator(a, b) 

865 for a, b in zip(self.prop._comparable_elements, values) 

866 ] 

867 if self._adapt_to_entity: 

868 assert self.adapter is not None 

869 comparisons = [self.adapter(x) for x in comparisons] 

870 return sql.and_(*comparisons) 

871 

872 def __str__(self) -> str: 

873 return str(self.parent.class_.__name__) + "." + self.key 

874 

875 

876class Composite(CompositeProperty[_T], _DeclarativeMapped[_T]): 

877 """Declarative-compatible front-end for the :class:`.CompositeProperty` 

878 class. 

879 

880 Public constructor is the :func:`_orm.composite` function. 

881 

882 .. versionchanged:: 2.0 Added :class:`_orm.Composite` as a Declarative 

883 compatible subclass of :class:`_orm.CompositeProperty`. 

884 

885 .. seealso:: 

886 

887 :ref:`mapper_composite` 

888 

889 """ 

890 

891 inherit_cache = True 

892 """:meta private:""" 

893 

894 

895class ConcreteInheritedProperty(DescriptorProperty[_T]): 

896 """A 'do nothing' :class:`.MapperProperty` that disables 

897 an attribute on a concrete subclass that is only present 

898 on the inherited mapper, not the concrete classes' mapper. 

899 

900 Cases where this occurs include: 

901 

902 * When the superclass mapper is mapped against a 

903 "polymorphic union", which includes all attributes from 

904 all subclasses. 

905 * When a relationship() is configured on an inherited mapper, 

906 but not on the subclass mapper. Concrete mappers require 

907 that relationship() is configured explicitly on each 

908 subclass. 

909 

910 """ 

911 

912 def _comparator_factory( 

913 self, mapper: Mapper[Any] 

914 ) -> Type[PropComparator[_T]]: 

915 comparator_callable = None 

916 

917 for m in self.parent.iterate_to_root(): 

918 p = m._props[self.key] 

919 if getattr(p, "comparator_factory", None) is not None: 

920 comparator_callable = p.comparator_factory 

921 break 

922 assert comparator_callable is not None 

923 return comparator_callable(p, mapper) # type: ignore 

924 

925 def __init__(self) -> None: 

926 super().__init__() 

927 

928 def warn() -> NoReturn: 

929 raise AttributeError( 

930 "Concrete %s does not implement " 

931 "attribute %r at the instance level. Add " 

932 "this property explicitly to %s." 

933 % (self.parent, self.key, self.parent) 

934 ) 

935 

936 class NoninheritedConcreteProp: 

937 def __set__(s: Any, obj: Any, value: Any) -> NoReturn: 

938 warn() 

939 

940 def __delete__(s: Any, obj: Any) -> NoReturn: 

941 warn() 

942 

943 def __get__(s: Any, obj: Any, owner: Any) -> Any: 

944 if obj is None: 

945 return self.descriptor 

946 warn() 

947 

948 self.descriptor = NoninheritedConcreteProp() 

949 

950 

951class SynonymProperty(DescriptorProperty[_T]): 

952 """Denote an attribute name as a synonym to a mapped property, 

953 in that the attribute will mirror the value and expression behavior 

954 of another attribute. 

955 

956 :class:`.Synonym` is constructed using the :func:`_orm.synonym` 

957 function. 

958 

959 .. seealso:: 

960 

961 :ref:`synonyms` - Overview of synonyms 

962 

963 """ 

964 

965 comparator_factory: Optional[Type[PropComparator[_T]]] 

966 

967 def __init__( 

968 self, 

969 name: str, 

970 map_column: Optional[bool] = None, 

971 descriptor: Optional[Any] = None, 

972 comparator_factory: Optional[Type[PropComparator[_T]]] = None, 

973 attribute_options: Optional[_AttributeOptions] = None, 

974 info: Optional[_InfoType] = None, 

975 doc: Optional[str] = None, 

976 ): 

977 super().__init__(attribute_options=attribute_options) 

978 

979 self.name = name 

980 self.map_column = map_column 

981 self.descriptor = descriptor 

982 self.comparator_factory = comparator_factory 

983 if doc: 

984 self.doc = doc 

985 elif descriptor and descriptor.__doc__: 

986 self.doc = descriptor.__doc__ 

987 else: 

988 self.doc = None 

989 if info: 

990 self.info.update(info) 

991 

992 util.set_creation_order(self) 

993 

994 if not TYPE_CHECKING: 

995 

996 @property 

997 def uses_objects(self) -> bool: 

998 return getattr(self.parent.class_, self.name).impl.uses_objects 

999 

1000 # TODO: when initialized, check _proxied_object, 

1001 # emit a warning if its not a column-based property 

1002 

1003 @util.memoized_property 

1004 def _proxied_object( 

1005 self, 

1006 ) -> Union[MapperProperty[_T], SQLORMOperations[_T]]: 

1007 attr = getattr(self.parent.class_, self.name) 

1008 if not hasattr(attr, "property") or not isinstance( 

1009 attr.property, MapperProperty 

1010 ): 

1011 # attribute is a non-MapperProprerty proxy such as 

1012 # hybrid or association proxy 

1013 if isinstance(attr, attributes.QueryableAttribute): 

1014 return attr.comparator 

1015 elif isinstance(attr, SQLORMOperations): 

1016 # assocaition proxy comes here 

1017 return attr 

1018 

1019 raise sa_exc.InvalidRequestError( 

1020 """synonym() attribute "%s.%s" only supports """ 

1021 """ORM mapped attributes, got %r""" 

1022 % (self.parent.class_.__name__, self.name, attr) 

1023 ) 

1024 return attr.property 

1025 

1026 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

1027 return (getattr(self.parent.class_, self.name),) 

1028 

1029 def _comparator_factory(self, mapper: Mapper[Any]) -> SQLORMOperations[_T]: 

1030 prop = self._proxied_object 

1031 

1032 if isinstance(prop, MapperProperty): 

1033 if self.comparator_factory: 

1034 comp = self.comparator_factory(prop, mapper) 

1035 else: 

1036 comp = prop.comparator_factory(prop, mapper) 

1037 return comp 

1038 else: 

1039 return prop 

1040 

1041 def get_history( 

1042 self, 

1043 state: InstanceState[Any], 

1044 dict_: _InstanceDict, 

1045 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

1046 ) -> History: 

1047 attr: QueryableAttribute[Any] = getattr(self.parent.class_, self.name) 

1048 return attr.impl.get_history(state, dict_, passive=passive) 

1049 

1050 def _get_dataclass_setup_options( 

1051 self, 

1052 decl_scan: _ClassScanMapperConfig, 

1053 key: str, 

1054 dataclass_setup_arguments: _DataclassArguments, 

1055 ) -> _AttributeOptions: 

1056 dataclasses_default = self._attribute_options.dataclasses_default 

1057 if ( 

1058 dataclasses_default is not _NoArg.NO_ARG 

1059 and not callable(dataclasses_default) 

1060 and not getattr( 

1061 decl_scan.cls, "_sa_disable_descriptor_defaults", False 

1062 ) 

1063 ): 

1064 proxied = decl_scan.collected_attributes[self.name] 

1065 proxied_default = proxied._attribute_options.dataclasses_default 

1066 if proxied_default != dataclasses_default: 

1067 raise sa_exc.ArgumentError( 

1068 f"Synonym {key!r} default argument " 

1069 f"{dataclasses_default!r} must match the dataclasses " 

1070 f"default value of proxied object {self.name!r}, " 

1071 f"""currently { 

1072 repr(proxied_default) 

1073 if proxied_default is not _NoArg.NO_ARG 

1074 else 'not set'}""" 

1075 ) 

1076 self._default_scalar_value = dataclasses_default 

1077 return self._attribute_options._replace( 

1078 dataclasses_default=DONT_SET 

1079 ) 

1080 

1081 return self._attribute_options 

1082 

1083 @util.preload_module("sqlalchemy.orm.properties") 

1084 def set_parent(self, parent: Mapper[Any], init: bool) -> None: 

1085 properties = util.preloaded.orm_properties 

1086 

1087 if self.map_column: 

1088 # implement the 'map_column' option. 

1089 if self.key not in parent.persist_selectable.c: 

1090 raise sa_exc.ArgumentError( 

1091 "Can't compile synonym '%s': no column on table " 

1092 "'%s' named '%s'" 

1093 % ( 

1094 self.name, 

1095 parent.persist_selectable.description, 

1096 self.key, 

1097 ) 

1098 ) 

1099 elif ( 

1100 parent.persist_selectable.c[self.key] 

1101 in parent._columntoproperty 

1102 and parent._columntoproperty[ 

1103 parent.persist_selectable.c[self.key] 

1104 ].key 

1105 == self.name 

1106 ): 

1107 raise sa_exc.ArgumentError( 

1108 "Can't call map_column=True for synonym %r=%r, " 

1109 "a ColumnProperty already exists keyed to the name " 

1110 "%r for column %r" 

1111 % (self.key, self.name, self.name, self.key) 

1112 ) 

1113 p: ColumnProperty[Any] = properties.ColumnProperty( 

1114 parent.persist_selectable.c[self.key] 

1115 ) 

1116 parent._configure_property(self.name, p, init=init, setparent=True) 

1117 p._mapped_by_synonym = self.key 

1118 

1119 self.parent = parent 

1120 

1121 

1122class Synonym(SynonymProperty[_T], _DeclarativeMapped[_T]): 

1123 """Declarative front-end for the :class:`.SynonymProperty` class. 

1124 

1125 Public constructor is the :func:`_orm.synonym` function. 

1126 

1127 .. versionchanged:: 2.0 Added :class:`_orm.Synonym` as a Declarative 

1128 compatible subclass for :class:`_orm.SynonymProperty` 

1129 

1130 .. seealso:: 

1131 

1132 :ref:`synonyms` - Overview of synonyms 

1133 

1134 """ 

1135 

1136 inherit_cache = True 

1137 """:meta private:"""