Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/descriptor_props.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

485 statements  

1# orm/descriptor_props.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Descriptor properties are more "auxiliary" properties 

9that exist as configurational elements, but don't participate 

10as actively in the load/persist ORM loop. 

11 

12""" 

13from __future__ import annotations 

14 

15from dataclasses import is_dataclass 

16import inspect 

17import itertools 

18import operator 

19import typing 

20from typing import Any 

21from typing import Callable 

22from typing import Dict 

23from typing import List 

24from typing import NoReturn 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import Type 

29from typing import TYPE_CHECKING 

30from typing import TypeVar 

31from typing import Union 

32import weakref 

33 

34from . import attributes 

35from . import util as orm_util 

36from .base import _DeclarativeMapped 

37from .base import LoaderCallableStatus 

38from .base import Mapped 

39from .base import PassiveFlag 

40from .base import SQLORMOperations 

41from .interfaces import _AttributeOptions 

42from .interfaces import _IntrospectsAnnotations 

43from .interfaces import _MapsColumns 

44from .interfaces import MapperProperty 

45from .interfaces import PropComparator 

46from .util import _none_set 

47from .util import de_stringify_annotation 

48from .. import event 

49from .. import exc as sa_exc 

50from .. import schema 

51from .. import sql 

52from .. import util 

53from ..sql import expression 

54from ..sql import operators 

55from ..sql.elements import BindParameter 

56from ..util.typing import get_args 

57from ..util.typing import is_fwd_ref 

58from ..util.typing import is_pep593 

59 

60 

61if typing.TYPE_CHECKING: 

62 from ._typing import _InstanceDict 

63 from ._typing import _RegistryType 

64 from .attributes import History 

65 from .attributes import InstrumentedAttribute 

66 from .attributes import QueryableAttribute 

67 from .context import ORMCompileState 

68 from .decl_base import _ClassScanMapperConfig 

69 from .mapper import Mapper 

70 from .properties import ColumnProperty 

71 from .properties import MappedColumn 

72 from .state import InstanceState 

73 from ..engine.base import Connection 

74 from ..engine.row import Row 

75 from ..sql._typing import _DMLColumnArgument 

76 from ..sql._typing import _InfoType 

77 from ..sql.elements import ClauseList 

78 from ..sql.elements import ColumnElement 

79 from ..sql.operators import OperatorType 

80 from ..sql.schema import Column 

81 from ..sql.selectable import Select 

82 from ..util.typing import _AnnotationScanType 

83 from ..util.typing import CallableReference 

84 from ..util.typing import DescriptorReference 

85 from ..util.typing import RODescriptorReference 

86 

87_T = TypeVar("_T", bound=Any) 

88_PT = TypeVar("_PT", bound=Any) 

89 

90 

91class DescriptorProperty(MapperProperty[_T]): 

92 """:class:`.MapperProperty` which proxies access to a 

93 user-defined descriptor.""" 

94 

95 doc: Optional[str] = None 

96 

97 uses_objects = False 

98 _links_to_entity = False 

99 

100 descriptor: DescriptorReference[Any] 

101 

102 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

103 raise NotImplementedError( 

104 "This MapperProperty does not implement column loader strategies" 

105 ) 

106 

107 def get_history( 

108 self, 

109 state: InstanceState[Any], 

110 dict_: _InstanceDict, 

111 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

112 ) -> History: 

113 raise NotImplementedError() 

114 

115 def instrument_class(self, mapper: Mapper[Any]) -> None: 

116 prop = self 

117 

118 class _ProxyImpl(attributes.AttributeImpl): 

119 accepts_scalar_loader = False 

120 load_on_unexpire = True 

121 collection = False 

122 

123 @property 

124 def uses_objects(self) -> bool: # type: ignore 

125 return prop.uses_objects 

126 

127 def __init__(self, key: str): 

128 self.key = key 

129 

130 def get_history( 

131 self, 

132 state: InstanceState[Any], 

133 dict_: _InstanceDict, 

134 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

135 ) -> History: 

136 return prop.get_history(state, dict_, passive) 

137 

138 if self.descriptor is None: 

139 desc = getattr(mapper.class_, self.key, None) 

140 if mapper._is_userland_descriptor(self.key, desc): 

141 self.descriptor = desc 

142 

143 if self.descriptor is None: 

144 

145 def fset(obj: Any, value: Any) -> None: 

146 setattr(obj, self.name, value) 

147 

148 def fdel(obj: Any) -> None: 

149 delattr(obj, self.name) 

150 

151 def fget(obj: Any) -> Any: 

152 return getattr(obj, self.name) 

153 

154 self.descriptor = property(fget=fget, fset=fset, fdel=fdel) 

155 

156 proxy_attr = attributes.create_proxied_attribute(self.descriptor)( 

157 self.parent.class_, 

158 self.key, 

159 self.descriptor, 

160 lambda: self._comparator_factory(mapper), 

161 doc=self.doc, 

162 original_property=self, 

163 ) 

164 proxy_attr.impl = _ProxyImpl(self.key) 

165 mapper.class_manager.instrument_attribute(self.key, proxy_attr) 

166 

167 

168_CompositeAttrType = Union[ 

169 str, 

170 "Column[_T]", 

171 "MappedColumn[_T]", 

172 "InstrumentedAttribute[_T]", 

173 "Mapped[_T]", 

174] 

175 

176 

177_CC = TypeVar("_CC", bound=Any) 

178 

179 

180_composite_getters: weakref.WeakKeyDictionary[ 

181 Type[Any], Callable[[Any], Tuple[Any, ...]] 

182] = weakref.WeakKeyDictionary() 

183 

184 

185class CompositeProperty( 

186 _MapsColumns[_CC], _IntrospectsAnnotations, DescriptorProperty[_CC] 

187): 

188 """Defines a "composite" mapped attribute, representing a collection 

189 of columns as one attribute. 

190 

191 :class:`.CompositeProperty` is constructed using the :func:`.composite` 

192 function. 

193 

194 .. seealso:: 

195 

196 :ref:`mapper_composite` 

197 

198 """ 

199 

200 composite_class: Union[Type[_CC], Callable[..., _CC]] 

201 attrs: Tuple[_CompositeAttrType[Any], ...] 

202 

203 _generated_composite_accessor: CallableReference[ 

204 Optional[Callable[[_CC], Tuple[Any, ...]]] 

205 ] 

206 

207 comparator_factory: Type[Comparator[_CC]] 

208 

209 def __init__( 

210 self, 

211 _class_or_attr: Union[ 

212 None, Type[_CC], Callable[..., _CC], _CompositeAttrType[Any] 

213 ] = None, 

214 *attrs: _CompositeAttrType[Any], 

215 attribute_options: Optional[_AttributeOptions] = None, 

216 active_history: bool = False, 

217 deferred: bool = False, 

218 group: Optional[str] = None, 

219 comparator_factory: Optional[Type[Comparator[_CC]]] = None, 

220 info: Optional[_InfoType] = None, 

221 **kwargs: Any, 

222 ): 

223 super().__init__(attribute_options=attribute_options) 

224 

225 if isinstance(_class_or_attr, (Mapped, str, sql.ColumnElement)): 

226 self.attrs = (_class_or_attr,) + attrs 

227 # will initialize within declarative_scan 

228 self.composite_class = None # type: ignore 

229 else: 

230 self.composite_class = _class_or_attr # type: ignore 

231 self.attrs = attrs 

232 

233 self.active_history = active_history 

234 self.deferred = deferred 

235 self.group = group 

236 self.comparator_factory = ( 

237 comparator_factory 

238 if comparator_factory is not None 

239 else self.__class__.Comparator 

240 ) 

241 self._generated_composite_accessor = None 

242 if info is not None: 

243 self.info.update(info) 

244 

245 util.set_creation_order(self) 

246 self._create_descriptor() 

247 self._init_accessor() 

248 

249 def instrument_class(self, mapper: Mapper[Any]) -> None: 

250 super().instrument_class(mapper) 

251 self._setup_event_handlers() 

252 

253 def _composite_values_from_instance(self, value: _CC) -> Tuple[Any, ...]: 

254 if self._generated_composite_accessor: 

255 return self._generated_composite_accessor(value) 

256 else: 

257 try: 

258 accessor = value.__composite_values__ 

259 except AttributeError as ae: 

260 raise sa_exc.InvalidRequestError( 

261 f"Composite class {self.composite_class.__name__} is not " 

262 f"a dataclass and does not define a __composite_values__()" 

263 " method; can't get state" 

264 ) from ae 

265 else: 

266 return accessor() # type: ignore 

267 

268 def do_init(self) -> None: 

269 """Initialization which occurs after the :class:`.Composite` 

270 has been associated with its parent mapper. 

271 

272 """ 

273 self._setup_arguments_on_columns() 

274 

275 _COMPOSITE_FGET = object() 

276 

277 def _create_descriptor(self) -> None: 

278 """Create the Python descriptor that will serve as 

279 the access point on instances of the mapped class. 

280 

281 """ 

282 

283 def fget(instance: Any) -> Any: 

284 dict_ = attributes.instance_dict(instance) 

285 state = attributes.instance_state(instance) 

286 

287 if self.key not in dict_: 

288 # key not present. Iterate through related 

289 # attributes, retrieve their values. This 

290 # ensures they all load. 

291 values = [ 

292 getattr(instance, key) for key in self._attribute_keys 

293 ] 

294 

295 # current expected behavior here is that the composite is 

296 # created on access if the object is persistent or if 

297 # col attributes have non-None. This would be better 

298 # if the composite were created unconditionally, 

299 # but that would be a behavioral change. 

300 if self.key not in dict_ and ( 

301 state.key is not None or not _none_set.issuperset(values) 

302 ): 

303 dict_[self.key] = self.composite_class(*values) 

304 state.manager.dispatch.refresh( 

305 state, self._COMPOSITE_FGET, [self.key] 

306 ) 

307 

308 return dict_.get(self.key, None) 

309 

310 def fset(instance: Any, value: Any) -> None: 

311 dict_ = attributes.instance_dict(instance) 

312 state = attributes.instance_state(instance) 

313 attr = state.manager[self.key] 

314 

315 if attr.dispatch._active_history: 

316 previous = fget(instance) 

317 else: 

318 previous = dict_.get(self.key, LoaderCallableStatus.NO_VALUE) 

319 

320 for fn in attr.dispatch.set: 

321 value = fn(state, value, previous, attr.impl) 

322 dict_[self.key] = value 

323 if value is None: 

324 for key in self._attribute_keys: 

325 setattr(instance, key, None) 

326 else: 

327 for key, value in zip( 

328 self._attribute_keys, 

329 self._composite_values_from_instance(value), 

330 ): 

331 setattr(instance, key, value) 

332 

333 def fdel(instance: Any) -> None: 

334 state = attributes.instance_state(instance) 

335 dict_ = attributes.instance_dict(instance) 

336 attr = state.manager[self.key] 

337 

338 if attr.dispatch._active_history: 

339 previous = fget(instance) 

340 dict_.pop(self.key, None) 

341 else: 

342 previous = dict_.pop(self.key, LoaderCallableStatus.NO_VALUE) 

343 

344 attr = state.manager[self.key] 

345 attr.dispatch.remove(state, previous, attr.impl) 

346 for key in self._attribute_keys: 

347 setattr(instance, key, None) 

348 

349 self.descriptor = property(fget, fset, fdel) 

350 

351 @util.preload_module("sqlalchemy.orm.properties") 

352 def declarative_scan( 

353 self, 

354 decl_scan: _ClassScanMapperConfig, 

355 registry: _RegistryType, 

356 cls: Type[Any], 

357 originating_module: Optional[str], 

358 key: str, 

359 mapped_container: Optional[Type[Mapped[Any]]], 

360 annotation: Optional[_AnnotationScanType], 

361 extracted_mapped_annotation: Optional[_AnnotationScanType], 

362 is_dataclass_field: bool, 

363 ) -> None: 

364 MappedColumn = util.preloaded.orm_properties.MappedColumn 

365 if ( 

366 self.composite_class is None 

367 and extracted_mapped_annotation is None 

368 ): 

369 self._raise_for_required(key, cls) 

370 argument = extracted_mapped_annotation 

371 

372 if is_pep593(argument): 

373 argument = get_args(argument)[0] 

374 

375 if argument and self.composite_class is None: 

376 if isinstance(argument, str) or is_fwd_ref( 

377 argument, check_generic=True 

378 ): 

379 if originating_module is None: 

380 str_arg = ( 

381 argument.__forward_arg__ 

382 if hasattr(argument, "__forward_arg__") 

383 else str(argument) 

384 ) 

385 raise sa_exc.ArgumentError( 

386 f"Can't use forward ref {argument} for composite " 

387 f"class argument; set up the type as Mapped[{str_arg}]" 

388 ) 

389 argument = de_stringify_annotation( 

390 cls, argument, originating_module, include_generic=True 

391 ) 

392 

393 self.composite_class = argument 

394 

395 if is_dataclass(self.composite_class): 

396 self._setup_for_dataclass( 

397 decl_scan, registry, cls, originating_module, key 

398 ) 

399 else: 

400 for attr in self.attrs: 

401 if ( 

402 isinstance(attr, (MappedColumn, schema.Column)) 

403 and attr.name is None 

404 ): 

405 raise sa_exc.ArgumentError( 

406 "Composite class column arguments must be named " 

407 "unless a dataclass is used" 

408 ) 

409 self._init_accessor() 

410 

411 def _init_accessor(self) -> None: 

412 if is_dataclass(self.composite_class) and not hasattr( 

413 self.composite_class, "__composite_values__" 

414 ): 

415 insp = inspect.signature(self.composite_class) 

416 getter = operator.attrgetter( 

417 *[p.name for p in insp.parameters.values()] 

418 ) 

419 if len(insp.parameters) == 1: 

420 self._generated_composite_accessor = lambda obj: (getter(obj),) 

421 else: 

422 self._generated_composite_accessor = getter 

423 

424 if ( 

425 self.composite_class is not None 

426 and isinstance(self.composite_class, type) 

427 and self.composite_class not in _composite_getters 

428 ): 

429 if self._generated_composite_accessor is not None: 

430 _composite_getters[self.composite_class] = ( 

431 self._generated_composite_accessor 

432 ) 

433 elif hasattr(self.composite_class, "__composite_values__"): 

434 _composite_getters[self.composite_class] = ( 

435 lambda obj: obj.__composite_values__() 

436 ) 

437 

438 @util.preload_module("sqlalchemy.orm.properties") 

439 @util.preload_module("sqlalchemy.orm.decl_base") 

440 def _setup_for_dataclass( 

441 self, 

442 decl_scan: _ClassScanMapperConfig, 

443 registry: _RegistryType, 

444 cls: Type[Any], 

445 originating_module: Optional[str], 

446 key: str, 

447 ) -> None: 

448 MappedColumn = util.preloaded.orm_properties.MappedColumn 

449 

450 decl_base = util.preloaded.orm_decl_base 

451 

452 insp = inspect.signature(self.composite_class) 

453 for param, attr in itertools.zip_longest( 

454 insp.parameters.values(), self.attrs 

455 ): 

456 if param is None: 

457 raise sa_exc.ArgumentError( 

458 f"number of composite attributes " 

459 f"{len(self.attrs)} exceeds " 

460 f"that of the number of attributes in class " 

461 f"{self.composite_class.__name__} {len(insp.parameters)}" 

462 ) 

463 if attr is None: 

464 # fill in missing attr spots with empty MappedColumn 

465 attr = MappedColumn() 

466 self.attrs += (attr,) 

467 

468 if isinstance(attr, MappedColumn): 

469 attr.declarative_scan_for_composite( 

470 decl_scan, 

471 registry, 

472 cls, 

473 originating_module, 

474 key, 

475 param.name, 

476 param.annotation, 

477 ) 

478 elif isinstance(attr, schema.Column): 

479 decl_base._undefer_column_name(param.name, attr) 

480 

481 @util.memoized_property 

482 def _comparable_elements(self) -> Sequence[QueryableAttribute[Any]]: 

483 return [getattr(self.parent.class_, prop.key) for prop in self.props] 

484 

485 @util.memoized_property 

486 @util.preload_module("orm.properties") 

487 def props(self) -> Sequence[MapperProperty[Any]]: 

488 props = [] 

489 MappedColumn = util.preloaded.orm_properties.MappedColumn 

490 

491 for attr in self.attrs: 

492 if isinstance(attr, str): 

493 prop = self.parent.get_property(attr, _configure_mappers=False) 

494 elif isinstance(attr, schema.Column): 

495 prop = self.parent._columntoproperty[attr] 

496 elif isinstance(attr, MappedColumn): 

497 prop = self.parent._columntoproperty[attr.column] 

498 elif isinstance(attr, attributes.InstrumentedAttribute): 

499 prop = attr.property 

500 else: 

501 prop = None 

502 

503 if not isinstance(prop, MapperProperty): 

504 raise sa_exc.ArgumentError( 

505 "Composite expects Column objects or mapped " 

506 f"attributes/attribute names as arguments, got: {attr!r}" 

507 ) 

508 

509 props.append(prop) 

510 return props 

511 

512 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

513 return self._comparable_elements 

514 

515 @util.non_memoized_property 

516 @util.preload_module("orm.properties") 

517 def columns(self) -> Sequence[Column[Any]]: 

518 MappedColumn = util.preloaded.orm_properties.MappedColumn 

519 return [ 

520 a.column if isinstance(a, MappedColumn) else a 

521 for a in self.attrs 

522 if isinstance(a, (schema.Column, MappedColumn)) 

523 ] 

524 

525 @property 

526 def mapper_property_to_assign(self) -> Optional[MapperProperty[_CC]]: 

527 return self 

528 

529 @property 

530 def columns_to_assign(self) -> List[Tuple[schema.Column[Any], int]]: 

531 return [(c, 0) for c in self.columns if c.table is None] 

532 

533 @util.preload_module("orm.properties") 

534 def _setup_arguments_on_columns(self) -> None: 

535 """Propagate configuration arguments made on this composite 

536 to the target columns, for those that apply. 

537 

538 """ 

539 ColumnProperty = util.preloaded.orm_properties.ColumnProperty 

540 

541 for prop in self.props: 

542 if not isinstance(prop, ColumnProperty): 

543 continue 

544 else: 

545 cprop = prop 

546 

547 cprop.active_history = self.active_history 

548 if self.deferred: 

549 cprop.deferred = self.deferred 

550 cprop.strategy_key = (("deferred", True), ("instrument", True)) 

551 cprop.group = self.group 

552 

553 def _setup_event_handlers(self) -> None: 

554 """Establish events that populate/expire the composite attribute.""" 

555 

556 def load_handler( 

557 state: InstanceState[Any], context: ORMCompileState 

558 ) -> None: 

559 _load_refresh_handler(state, context, None, is_refresh=False) 

560 

561 def refresh_handler( 

562 state: InstanceState[Any], 

563 context: ORMCompileState, 

564 to_load: Optional[Sequence[str]], 

565 ) -> None: 

566 # note this corresponds to sqlalchemy.ext.mutable load_attrs() 

567 

568 if not to_load or ( 

569 {self.key}.union(self._attribute_keys) 

570 ).intersection(to_load): 

571 _load_refresh_handler(state, context, to_load, is_refresh=True) 

572 

573 def _load_refresh_handler( 

574 state: InstanceState[Any], 

575 context: ORMCompileState, 

576 to_load: Optional[Sequence[str]], 

577 is_refresh: bool, 

578 ) -> None: 

579 dict_ = state.dict 

580 

581 # if context indicates we are coming from the 

582 # fget() handler, this already set the value; skip the 

583 # handler here. (other handlers like mutablecomposite will still 

584 # want to catch it) 

585 # there's an insufficiency here in that the fget() handler 

586 # really should not be using the refresh event and there should 

587 # be some other event that mutablecomposite can subscribe 

588 # towards for this. 

589 

590 if ( 

591 not is_refresh or context is self._COMPOSITE_FGET 

592 ) and self.key in dict_: 

593 return 

594 

595 # if column elements aren't loaded, skip. 

596 # __get__() will initiate a load for those 

597 # columns 

598 for k in self._attribute_keys: 

599 if k not in dict_: 

600 return 

601 

602 dict_[self.key] = self.composite_class( 

603 *[state.dict[key] for key in self._attribute_keys] 

604 ) 

605 

606 def expire_handler( 

607 state: InstanceState[Any], keys: Optional[Sequence[str]] 

608 ) -> None: 

609 if keys is None or set(self._attribute_keys).intersection(keys): 

610 state.dict.pop(self.key, None) 

611 

612 def insert_update_handler( 

613 mapper: Mapper[Any], 

614 connection: Connection, 

615 state: InstanceState[Any], 

616 ) -> None: 

617 """After an insert or update, some columns may be expired due 

618 to server side defaults, or re-populated due to client side 

619 defaults. Pop out the composite value here so that it 

620 recreates. 

621 

622 """ 

623 

624 state.dict.pop(self.key, None) 

625 

626 event.listen( 

627 self.parent, "after_insert", insert_update_handler, raw=True 

628 ) 

629 event.listen( 

630 self.parent, "after_update", insert_update_handler, raw=True 

631 ) 

632 event.listen( 

633 self.parent, "load", load_handler, raw=True, propagate=True 

634 ) 

635 event.listen( 

636 self.parent, "refresh", refresh_handler, raw=True, propagate=True 

637 ) 

638 event.listen( 

639 self.parent, "expire", expire_handler, raw=True, propagate=True 

640 ) 

641 

642 proxy_attr = self.parent.class_manager[self.key] 

643 proxy_attr.impl.dispatch = proxy_attr.dispatch # type: ignore 

644 proxy_attr.impl.dispatch._active_history = self.active_history 

645 

646 # TODO: need a deserialize hook here 

647 

648 @util.memoized_property 

649 def _attribute_keys(self) -> Sequence[str]: 

650 return [prop.key for prop in self.props] 

651 

652 def _populate_composite_bulk_save_mappings_fn( 

653 self, 

654 ) -> Callable[[Dict[str, Any]], None]: 

655 if self._generated_composite_accessor: 

656 get_values = self._generated_composite_accessor 

657 else: 

658 

659 def get_values(val: Any) -> Tuple[Any]: 

660 return val.__composite_values__() # type: ignore 

661 

662 attrs = [prop.key for prop in self.props] 

663 

664 def populate(dest_dict: Dict[str, Any]) -> None: 

665 dest_dict.update( 

666 { 

667 key: val 

668 for key, val in zip( 

669 attrs, get_values(dest_dict.pop(self.key)) 

670 ) 

671 } 

672 ) 

673 

674 return populate 

675 

676 def get_history( 

677 self, 

678 state: InstanceState[Any], 

679 dict_: _InstanceDict, 

680 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

681 ) -> History: 

682 """Provided for userland code that uses attributes.get_history().""" 

683 

684 added: List[Any] = [] 

685 deleted: List[Any] = [] 

686 

687 has_history = False 

688 for prop in self.props: 

689 key = prop.key 

690 hist = state.manager[key].impl.get_history(state, dict_) 

691 if hist.has_changes(): 

692 has_history = True 

693 

694 non_deleted = hist.non_deleted() 

695 if non_deleted: 

696 added.extend(non_deleted) 

697 else: 

698 added.append(None) 

699 if hist.deleted: 

700 deleted.extend(hist.deleted) 

701 else: 

702 deleted.append(None) 

703 

704 if has_history: 

705 return attributes.History( 

706 [self.composite_class(*added)], 

707 (), 

708 [self.composite_class(*deleted)], 

709 ) 

710 else: 

711 return attributes.History((), [self.composite_class(*added)], ()) 

712 

713 def _comparator_factory( 

714 self, mapper: Mapper[Any] 

715 ) -> Composite.Comparator[_CC]: 

716 return self.comparator_factory(self, mapper) 

717 

718 class CompositeBundle(orm_util.Bundle[_T]): 

719 def __init__( 

720 self, 

721 property_: Composite[_T], 

722 expr: ClauseList, 

723 ): 

724 self.property = property_ 

725 super().__init__(property_.key, *expr) 

726 

727 def create_row_processor( 

728 self, 

729 query: Select[Any], 

730 procs: Sequence[Callable[[Row[Any]], Any]], 

731 labels: Sequence[str], 

732 ) -> Callable[[Row[Any]], Any]: 

733 def proc(row: Row[Any]) -> Any: 

734 return self.property.composite_class( 

735 *[proc(row) for proc in procs] 

736 ) 

737 

738 return proc 

739 

740 class Comparator(PropComparator[_PT]): 

741 """Produce boolean, comparison, and other operators for 

742 :class:`.Composite` attributes. 

743 

744 See the example in :ref:`composite_operations` for an overview 

745 of usage , as well as the documentation for :class:`.PropComparator`. 

746 

747 .. seealso:: 

748 

749 :class:`.PropComparator` 

750 

751 :class:`.ColumnOperators` 

752 

753 :ref:`types_operators` 

754 

755 :attr:`.TypeEngine.comparator_factory` 

756 

757 """ 

758 

759 # https://github.com/python/mypy/issues/4266 

760 __hash__ = None # type: ignore 

761 

762 prop: RODescriptorReference[Composite[_PT]] 

763 

764 @util.memoized_property 

765 def clauses(self) -> ClauseList: 

766 return expression.ClauseList( 

767 group=False, *self._comparable_elements 

768 ) 

769 

770 def __clause_element__(self) -> CompositeProperty.CompositeBundle[_PT]: 

771 return self.expression 

772 

773 @util.memoized_property 

774 def expression(self) -> CompositeProperty.CompositeBundle[_PT]: 

775 clauses = self.clauses._annotate( 

776 { 

777 "parententity": self._parententity, 

778 "parentmapper": self._parententity, 

779 "proxy_key": self.prop.key, 

780 } 

781 ) 

782 return CompositeProperty.CompositeBundle(self.prop, clauses) 

783 

784 def _bulk_update_tuples( 

785 self, value: Any 

786 ) -> Sequence[Tuple[_DMLColumnArgument, Any]]: 

787 if isinstance(value, BindParameter): 

788 value = value.value 

789 

790 values: Sequence[Any] 

791 

792 if value is None: 

793 values = [None for key in self.prop._attribute_keys] 

794 elif isinstance(self.prop.composite_class, type) and isinstance( 

795 value, self.prop.composite_class 

796 ): 

797 values = self.prop._composite_values_from_instance( 

798 value # type: ignore[arg-type] 

799 ) 

800 else: 

801 raise sa_exc.ArgumentError( 

802 "Can't UPDATE composite attribute %s to %r" 

803 % (self.prop, value) 

804 ) 

805 

806 return list(zip(self._comparable_elements, values)) 

807 

808 @util.memoized_property 

809 def _comparable_elements(self) -> Sequence[QueryableAttribute[Any]]: 

810 if self._adapt_to_entity: 

811 return [ 

812 getattr(self._adapt_to_entity.entity, prop.key) 

813 for prop in self.prop._comparable_elements 

814 ] 

815 else: 

816 return self.prop._comparable_elements 

817 

818 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501 

819 return self._compare(operators.eq, other) 

820 

821 def __ne__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] # noqa: E501 

822 return self._compare(operators.ne, other) 

823 

824 def __lt__(self, other: Any) -> ColumnElement[bool]: 

825 return self._compare(operators.lt, other) 

826 

827 def __gt__(self, other: Any) -> ColumnElement[bool]: 

828 return self._compare(operators.gt, other) 

829 

830 def __le__(self, other: Any) -> ColumnElement[bool]: 

831 return self._compare(operators.le, other) 

832 

833 def __ge__(self, other: Any) -> ColumnElement[bool]: 

834 return self._compare(operators.ge, other) 

835 

836 # what might be interesting would be if we create 

837 # an instance of the composite class itself with 

838 # the columns as data members, then use "hybrid style" comparison 

839 # to create these comparisons. then your Point.__eq__() method could 

840 # be where comparison behavior is defined for SQL also. Likely 

841 # not a good choice for default behavior though, not clear how it would 

842 # work w/ dataclasses, etc. also no demand for any of this anyway. 

843 def _compare( 

844 self, operator: OperatorType, other: Any 

845 ) -> ColumnElement[bool]: 

846 values: Sequence[Any] 

847 if other is None: 

848 values = [None] * len(self.prop._comparable_elements) 

849 else: 

850 values = self.prop._composite_values_from_instance(other) 

851 comparisons = [ 

852 operator(a, b) 

853 for a, b in zip(self.prop._comparable_elements, values) 

854 ] 

855 if self._adapt_to_entity: 

856 assert self.adapter is not None 

857 comparisons = [self.adapter(x) for x in comparisons] 

858 return sql.and_(*comparisons) 

859 

860 def __str__(self) -> str: 

861 return str(self.parent.class_.__name__) + "." + self.key 

862 

863 

864class Composite(CompositeProperty[_T], _DeclarativeMapped[_T]): 

865 """Declarative-compatible front-end for the :class:`.CompositeProperty` 

866 class. 

867 

868 Public constructor is the :func:`_orm.composite` function. 

869 

870 .. versionchanged:: 2.0 Added :class:`_orm.Composite` as a Declarative 

871 compatible subclass of :class:`_orm.CompositeProperty`. 

872 

873 .. seealso:: 

874 

875 :ref:`mapper_composite` 

876 

877 """ 

878 

879 inherit_cache = True 

880 """:meta private:""" 

881 

882 

883class ConcreteInheritedProperty(DescriptorProperty[_T]): 

884 """A 'do nothing' :class:`.MapperProperty` that disables 

885 an attribute on a concrete subclass that is only present 

886 on the inherited mapper, not the concrete classes' mapper. 

887 

888 Cases where this occurs include: 

889 

890 * When the superclass mapper is mapped against a 

891 "polymorphic union", which includes all attributes from 

892 all subclasses. 

893 * When a relationship() is configured on an inherited mapper, 

894 but not on the subclass mapper. Concrete mappers require 

895 that relationship() is configured explicitly on each 

896 subclass. 

897 

898 """ 

899 

900 def _comparator_factory( 

901 self, mapper: Mapper[Any] 

902 ) -> Type[PropComparator[_T]]: 

903 comparator_callable = None 

904 

905 for m in self.parent.iterate_to_root(): 

906 p = m._props[self.key] 

907 if getattr(p, "comparator_factory", None) is not None: 

908 comparator_callable = p.comparator_factory 

909 break 

910 assert comparator_callable is not None 

911 return comparator_callable(p, mapper) # type: ignore 

912 

913 def __init__(self) -> None: 

914 super().__init__() 

915 

916 def warn() -> NoReturn: 

917 raise AttributeError( 

918 "Concrete %s does not implement " 

919 "attribute %r at the instance level. Add " 

920 "this property explicitly to %s." 

921 % (self.parent, self.key, self.parent) 

922 ) 

923 

924 class NoninheritedConcreteProp: 

925 def __set__(s: Any, obj: Any, value: Any) -> NoReturn: 

926 warn() 

927 

928 def __delete__(s: Any, obj: Any) -> NoReturn: 

929 warn() 

930 

931 def __get__(s: Any, obj: Any, owner: Any) -> Any: 

932 if obj is None: 

933 return self.descriptor 

934 warn() 

935 

936 self.descriptor = NoninheritedConcreteProp() 

937 

938 

939class SynonymProperty(DescriptorProperty[_T]): 

940 """Denote an attribute name as a synonym to a mapped property, 

941 in that the attribute will mirror the value and expression behavior 

942 of another attribute. 

943 

944 :class:`.Synonym` is constructed using the :func:`_orm.synonym` 

945 function. 

946 

947 .. seealso:: 

948 

949 :ref:`synonyms` - Overview of synonyms 

950 

951 """ 

952 

953 comparator_factory: Optional[Type[PropComparator[_T]]] 

954 

955 def __init__( 

956 self, 

957 name: str, 

958 map_column: Optional[bool] = None, 

959 descriptor: Optional[Any] = None, 

960 comparator_factory: Optional[Type[PropComparator[_T]]] = None, 

961 attribute_options: Optional[_AttributeOptions] = None, 

962 info: Optional[_InfoType] = None, 

963 doc: Optional[str] = None, 

964 ): 

965 super().__init__(attribute_options=attribute_options) 

966 

967 self.name = name 

968 self.map_column = map_column 

969 self.descriptor = descriptor 

970 self.comparator_factory = comparator_factory 

971 if doc: 

972 self.doc = doc 

973 elif descriptor and descriptor.__doc__: 

974 self.doc = descriptor.__doc__ 

975 else: 

976 self.doc = None 

977 if info: 

978 self.info.update(info) 

979 

980 util.set_creation_order(self) 

981 

982 if not TYPE_CHECKING: 

983 

984 @property 

985 def uses_objects(self) -> bool: 

986 return getattr(self.parent.class_, self.name).impl.uses_objects 

987 

988 # TODO: when initialized, check _proxied_object, 

989 # emit a warning if its not a column-based property 

990 

991 @util.memoized_property 

992 def _proxied_object( 

993 self, 

994 ) -> Union[MapperProperty[_T], SQLORMOperations[_T]]: 

995 attr = getattr(self.parent.class_, self.name) 

996 if not hasattr(attr, "property") or not isinstance( 

997 attr.property, MapperProperty 

998 ): 

999 # attribute is a non-MapperProprerty proxy such as 

1000 # hybrid or association proxy 

1001 if isinstance(attr, attributes.QueryableAttribute): 

1002 return attr.comparator 

1003 elif isinstance(attr, SQLORMOperations): 

1004 # assocaition proxy comes here 

1005 return attr 

1006 

1007 raise sa_exc.InvalidRequestError( 

1008 """synonym() attribute "%s.%s" only supports """ 

1009 """ORM mapped attributes, got %r""" 

1010 % (self.parent.class_.__name__, self.name, attr) 

1011 ) 

1012 return attr.property 

1013 

1014 def _column_strategy_attrs(self) -> Sequence[QueryableAttribute[Any]]: 

1015 return (getattr(self.parent.class_, self.name),) 

1016 

1017 def _comparator_factory(self, mapper: Mapper[Any]) -> SQLORMOperations[_T]: 

1018 prop = self._proxied_object 

1019 

1020 if isinstance(prop, MapperProperty): 

1021 if self.comparator_factory: 

1022 comp = self.comparator_factory(prop, mapper) 

1023 else: 

1024 comp = prop.comparator_factory(prop, mapper) 

1025 return comp 

1026 else: 

1027 return prop 

1028 

1029 def get_history( 

1030 self, 

1031 state: InstanceState[Any], 

1032 dict_: _InstanceDict, 

1033 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

1034 ) -> History: 

1035 attr: QueryableAttribute[Any] = getattr(self.parent.class_, self.name) 

1036 return attr.impl.get_history(state, dict_, passive=passive) 

1037 

1038 @util.preload_module("sqlalchemy.orm.properties") 

1039 def set_parent(self, parent: Mapper[Any], init: bool) -> None: 

1040 properties = util.preloaded.orm_properties 

1041 

1042 if self.map_column: 

1043 # implement the 'map_column' option. 

1044 if self.key not in parent.persist_selectable.c: 

1045 raise sa_exc.ArgumentError( 

1046 "Can't compile synonym '%s': no column on table " 

1047 "'%s' named '%s'" 

1048 % ( 

1049 self.name, 

1050 parent.persist_selectable.description, 

1051 self.key, 

1052 ) 

1053 ) 

1054 elif ( 

1055 parent.persist_selectable.c[self.key] 

1056 in parent._columntoproperty 

1057 and parent._columntoproperty[ 

1058 parent.persist_selectable.c[self.key] 

1059 ].key 

1060 == self.name 

1061 ): 

1062 raise sa_exc.ArgumentError( 

1063 "Can't call map_column=True for synonym %r=%r, " 

1064 "a ColumnProperty already exists keyed to the name " 

1065 "%r for column %r" 

1066 % (self.key, self.name, self.name, self.key) 

1067 ) 

1068 p: ColumnProperty[Any] = properties.ColumnProperty( 

1069 parent.persist_selectable.c[self.key] 

1070 ) 

1071 parent._configure_property(self.name, p, init=init, setparent=True) 

1072 p._mapped_by_synonym = self.key 

1073 

1074 self.parent = parent 

1075 

1076 

1077class Synonym(SynonymProperty[_T], _DeclarativeMapped[_T]): 

1078 """Declarative front-end for the :class:`.SynonymProperty` class. 

1079 

1080 Public constructor is the :func:`_orm.synonym` function. 

1081 

1082 .. versionchanged:: 2.0 Added :class:`_orm.Synonym` as a Declarative 

1083 compatible subclass for :class:`_orm.SynonymProperty` 

1084 

1085 .. seealso:: 

1086 

1087 :ref:`synonyms` - Overview of synonyms 

1088 

1089 """ 

1090 

1091 inherit_cache = True 

1092 """:meta private:"""