Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

286 statements  

1# orm/writeonly.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Write-only collection API. 

9 

10This is an alternate mapped attribute style that only supports single-item 

11collection mutation operations. To read the collection, a select() 

12object must be executed each time. 

13 

14.. versionadded:: 2.0 

15 

16 

17""" 

18 

19from __future__ import annotations 

20 

21from typing import Any 

22from typing import Collection 

23from typing import Dict 

24from typing import Generic 

25from typing import Iterable 

26from typing import Iterator 

27from typing import List 

28from typing import NoReturn 

29from typing import Optional 

30from typing import overload 

31from typing import Tuple 

32from typing import Type 

33from typing import TYPE_CHECKING 

34from typing import TypeVar 

35from typing import Union 

36 

37from sqlalchemy.sql import bindparam 

38from . import attributes 

39from . import interfaces 

40from . import relationships 

41from . import strategies 

42from .base import ATTR_EMPTY 

43from .base import NEVER_SET 

44from .base import object_mapper 

45from .base import PassiveFlag 

46from .base import RelationshipDirection 

47from .. import exc 

48from .. import inspect 

49from .. import log 

50from .. import util 

51from ..sql import delete 

52from ..sql import insert 

53from ..sql import select 

54from ..sql import update 

55from ..sql.dml import Delete 

56from ..sql.dml import Insert 

57from ..sql.dml import Update 

58from ..util.typing import Literal 

59 

60if TYPE_CHECKING: 

61 from . import QueryableAttribute 

62 from ._typing import _InstanceDict 

63 from .attributes import AttributeEventToken 

64 from .base import LoaderCallableStatus 

65 from .collections import _AdaptedCollectionProtocol 

66 from .collections import CollectionAdapter 

67 from .mapper import Mapper 

68 from .relationships import _RelationshipOrderByArg 

69 from .state import InstanceState 

70 from .util import AliasedClass 

71 from ..event import _Dispatch 

72 from ..sql.selectable import FromClause 

73 from ..sql.selectable import Select 

74 

75_T = TypeVar("_T", bound=Any) 

76 

77 

78class WriteOnlyHistory(Generic[_T]): 

79 """Overrides AttributeHistory to receive append/remove events directly.""" 

80 

81 unchanged_items: util.OrderedIdentitySet 

82 added_items: util.OrderedIdentitySet 

83 deleted_items: util.OrderedIdentitySet 

84 _reconcile_collection: bool 

85 

86 def __init__( 

87 self, 

88 attr: _WriteOnlyAttributeImpl, 

89 state: InstanceState[_T], 

90 passive: PassiveFlag, 

91 apply_to: Optional[WriteOnlyHistory[_T]] = None, 

92 ) -> None: 

93 if apply_to: 

94 if passive & PassiveFlag.SQL_OK: 

95 raise exc.InvalidRequestError( 

96 f"Attribute {attr} can't load the existing state from the " 

97 "database for this operation; full iteration is not " 

98 "permitted. If this is a delete operation, configure " 

99 f"passive_deletes=True on the {attr} relationship in " 

100 "order to resolve this error." 

101 ) 

102 

103 self.unchanged_items = apply_to.unchanged_items 

104 self.added_items = apply_to.added_items 

105 self.deleted_items = apply_to.deleted_items 

106 self._reconcile_collection = apply_to._reconcile_collection 

107 else: 

108 self.deleted_items = util.OrderedIdentitySet() 

109 self.added_items = util.OrderedIdentitySet() 

110 self.unchanged_items = util.OrderedIdentitySet() 

111 self._reconcile_collection = False 

112 

113 @property 

114 def added_plus_unchanged(self) -> List[_T]: 

115 return list(self.added_items.union(self.unchanged_items)) 

116 

117 @property 

118 def all_items(self) -> List[_T]: 

119 return list( 

120 self.added_items.union(self.unchanged_items).union( 

121 self.deleted_items 

122 ) 

123 ) 

124 

125 def as_history(self) -> attributes.History: 

126 if self._reconcile_collection: 

127 added = self.added_items.difference(self.unchanged_items) 

128 deleted = self.deleted_items.intersection(self.unchanged_items) 

129 unchanged = self.unchanged_items.difference(deleted) 

130 else: 

131 added, unchanged, deleted = ( 

132 self.added_items, 

133 self.unchanged_items, 

134 self.deleted_items, 

135 ) 

136 return attributes.History(list(added), list(unchanged), list(deleted)) 

137 

138 def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]: 

139 return list(self.added_items)[index] 

140 

141 def add_added(self, value: _T) -> None: 

142 self.added_items.add(value) 

143 

144 def add_removed(self, value: _T) -> None: 

145 if value in self.added_items: 

146 self.added_items.remove(value) 

147 else: 

148 self.deleted_items.add(value) 

149 

150 

151class _WriteOnlyAttributeImpl( 

152 attributes._HasCollectionAdapter, attributes._AttributeImpl 

153): 

154 uses_objects: bool = True 

155 default_accepts_scalar_loader: bool = False 

156 supports_population: bool = False 

157 _supports_dynamic_iteration: bool = False 

158 collection: bool = False 

159 dynamic: bool = True 

160 order_by: _RelationshipOrderByArg = () 

161 collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory 

162 

163 query_class: Type[WriteOnlyCollection[Any]] 

164 

165 def __init__( 

166 self, 

167 class_: Union[Type[Any], AliasedClass[Any]], 

168 key: str, 

169 dispatch: _Dispatch[QueryableAttribute[Any]], 

170 target_mapper: Mapper[_T], 

171 order_by: _RelationshipOrderByArg, 

172 **kw: Any, 

173 ): 

174 super().__init__(class_, key, None, dispatch, **kw) 

175 self.target_mapper = target_mapper 

176 self.query_class = WriteOnlyCollection 

177 if order_by: 

178 self.order_by = tuple(order_by) 

179 

180 def get( 

181 self, 

182 state: InstanceState[Any], 

183 dict_: _InstanceDict, 

184 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

185 ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]: 

186 if not passive & PassiveFlag.SQL_OK: 

187 return self._get_collection_history( 

188 state, PassiveFlag.PASSIVE_NO_INITIALIZE 

189 ).added_items 

190 else: 

191 return self.query_class(self, state) 

192 

193 @overload 

194 def get_collection( 

195 self, 

196 state: InstanceState[Any], 

197 dict_: _InstanceDict, 

198 user_data: Literal[None] = ..., 

199 passive: Literal[PassiveFlag.PASSIVE_OFF] = ..., 

200 ) -> CollectionAdapter: ... 

201 

202 @overload 

203 def get_collection( 

204 self, 

205 state: InstanceState[Any], 

206 dict_: _InstanceDict, 

207 user_data: _AdaptedCollectionProtocol = ..., 

208 passive: PassiveFlag = ..., 

209 ) -> CollectionAdapter: ... 

210 

211 @overload 

212 def get_collection( 

213 self, 

214 state: InstanceState[Any], 

215 dict_: _InstanceDict, 

216 user_data: Optional[_AdaptedCollectionProtocol] = ..., 

217 passive: PassiveFlag = ..., 

218 ) -> Union[ 

219 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter 

220 ]: ... 

221 

222 def get_collection( 

223 self, 

224 state: InstanceState[Any], 

225 dict_: _InstanceDict, 

226 user_data: Optional[_AdaptedCollectionProtocol] = None, 

227 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

228 ) -> Union[ 

229 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter 

230 ]: 

231 data: Collection[Any] 

232 if not passive & PassiveFlag.SQL_OK: 

233 data = self._get_collection_history(state, passive).added_items 

234 else: 

235 history = self._get_collection_history(state, passive) 

236 data = history.added_plus_unchanged 

237 return _DynamicCollectionAdapter(data) # type: ignore[return-value] 

238 

239 @util.memoized_property 

240 def _append_token(self) -> attributes.AttributeEventToken: 

241 return attributes.AttributeEventToken(self, attributes.OP_APPEND) 

242 

243 @util.memoized_property 

244 def _remove_token(self) -> attributes.AttributeEventToken: 

245 return attributes.AttributeEventToken(self, attributes.OP_REMOVE) 

246 

247 def fire_append_event( 

248 self, 

249 state: InstanceState[Any], 

250 dict_: _InstanceDict, 

251 value: Any, 

252 initiator: Optional[AttributeEventToken], 

253 collection_history: Optional[WriteOnlyHistory[Any]] = None, 

254 ) -> None: 

255 if collection_history is None: 

256 collection_history = self._modified_event(state, dict_) 

257 

258 collection_history.add_added(value) 

259 

260 for fn in self.dispatch.append: 

261 value = fn(state, value, initiator or self._append_token) 

262 

263 if self.trackparent and value is not None: 

264 self.sethasparent(attributes.instance_state(value), state, True) 

265 

266 def fire_remove_event( 

267 self, 

268 state: InstanceState[Any], 

269 dict_: _InstanceDict, 

270 value: Any, 

271 initiator: Optional[AttributeEventToken], 

272 collection_history: Optional[WriteOnlyHistory[Any]] = None, 

273 ) -> None: 

274 if collection_history is None: 

275 collection_history = self._modified_event(state, dict_) 

276 

277 collection_history.add_removed(value) 

278 

279 if self.trackparent and value is not None: 

280 self.sethasparent(attributes.instance_state(value), state, False) 

281 

282 for fn in self.dispatch.remove: 

283 fn(state, value, initiator or self._remove_token) 

284 

285 def _modified_event( 

286 self, state: InstanceState[Any], dict_: _InstanceDict 

287 ) -> WriteOnlyHistory[Any]: 

288 if self.key not in state.committed_state: 

289 state.committed_state[self.key] = self.collection_history_cls( 

290 self, state, PassiveFlag.PASSIVE_NO_FETCH 

291 ) 

292 

293 state._modified_event(dict_, self, NEVER_SET) 

294 

295 # this is a hack to allow the entities.ComparableEntity fixture 

296 # to work 

297 dict_[self.key] = True 

298 return state.committed_state[self.key] # type: ignore[no-any-return] 

299 

300 def set( 

301 self, 

302 state: InstanceState[Any], 

303 dict_: _InstanceDict, 

304 value: Any, 

305 initiator: Optional[AttributeEventToken] = None, 

306 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

307 check_old: Any = None, 

308 pop: bool = False, 

309 _adapt: bool = True, 

310 ) -> None: 

311 if initiator and initiator.parent_token is self.parent_token: 

312 return 

313 

314 if pop and value is None: 

315 return 

316 

317 iterable = value 

318 new_values = list(iterable) 

319 if state.has_identity: 

320 if not self._supports_dynamic_iteration: 

321 raise exc.InvalidRequestError( 

322 f'Collection "{self}" does not support implicit ' 

323 "iteration; collection replacement operations " 

324 "can't be used" 

325 ) 

326 old_collection = util.IdentitySet( 

327 self.get(state, dict_, passive=passive) 

328 ) 

329 

330 collection_history = self._modified_event(state, dict_) 

331 if not state.has_identity: 

332 old_collection = collection_history.added_items 

333 else: 

334 old_collection = old_collection.union( 

335 collection_history.added_items 

336 ) 

337 

338 constants = old_collection.intersection(new_values) 

339 additions = util.IdentitySet(new_values).difference(constants) 

340 removals = old_collection.difference(constants) 

341 

342 for member in new_values: 

343 if member in additions: 

344 self.fire_append_event( 

345 state, 

346 dict_, 

347 member, 

348 None, 

349 collection_history=collection_history, 

350 ) 

351 

352 for member in removals: 

353 self.fire_remove_event( 

354 state, 

355 dict_, 

356 member, 

357 None, 

358 collection_history=collection_history, 

359 ) 

360 

361 def delete(self, *args: Any, **kwargs: Any) -> NoReturn: 

362 raise NotImplementedError() 

363 

364 def set_committed_value( 

365 self, state: InstanceState[Any], dict_: _InstanceDict, value: Any 

366 ) -> NoReturn: 

367 raise NotImplementedError( 

368 "Dynamic attributes don't support collection population." 

369 ) 

370 

371 def get_history( 

372 self, 

373 state: InstanceState[Any], 

374 dict_: _InstanceDict, 

375 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 

376 ) -> attributes.History: 

377 c = self._get_collection_history(state, passive) 

378 return c.as_history() 

379 

380 def get_all_pending( 

381 self, 

382 state: InstanceState[Any], 

383 dict_: _InstanceDict, 

384 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE, 

385 ) -> List[Tuple[InstanceState[Any], Any]]: 

386 c = self._get_collection_history(state, passive) 

387 return [(attributes.instance_state(x), x) for x in c.all_items] 

388 

389 def _default_value( 

390 self, state: InstanceState[Any], dict_: _InstanceDict 

391 ) -> Any: 

392 value = None 

393 for fn in self.dispatch.init_scalar: 

394 ret = fn(state, value, dict_) 

395 if ret is not ATTR_EMPTY: 

396 value = ret 

397 

398 return value 

399 

400 def _get_collection_history( 

401 self, state: InstanceState[Any], passive: PassiveFlag 

402 ) -> WriteOnlyHistory[Any]: 

403 c: WriteOnlyHistory[Any] 

404 if self.key in state.committed_state: 

405 c = state.committed_state[self.key] 

406 else: 

407 c = self.collection_history_cls( 

408 self, state, PassiveFlag.PASSIVE_NO_FETCH 

409 ) 

410 

411 if state.has_identity and (passive & PassiveFlag.INIT_OK): 

412 return self.collection_history_cls( 

413 self, state, passive, apply_to=c 

414 ) 

415 else: 

416 return c 

417 

418 def append( 

419 self, 

420 state: InstanceState[Any], 

421 dict_: _InstanceDict, 

422 value: Any, 

423 initiator: Optional[AttributeEventToken], 

424 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 

425 ) -> None: 

426 if initiator is not self: 

427 self.fire_append_event(state, dict_, value, initiator) 

428 

429 def remove( 

430 self, 

431 state: InstanceState[Any], 

432 dict_: _InstanceDict, 

433 value: Any, 

434 initiator: Optional[AttributeEventToken], 

435 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 

436 ) -> None: 

437 if initiator is not self: 

438 self.fire_remove_event(state, dict_, value, initiator) 

439 

440 def pop( 

441 self, 

442 state: InstanceState[Any], 

443 dict_: _InstanceDict, 

444 value: Any, 

445 initiator: Optional[AttributeEventToken], 

446 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 

447 ) -> None: 

448 self.remove(state, dict_, value, initiator, passive=passive) 

449 

450 

451@log.class_logger 

452@relationships.RelationshipProperty.strategy_for(lazy="write_only") 

453class _WriteOnlyLoader(strategies._AbstractRelationshipLoader, log.Identified): 

454 impl_class = _WriteOnlyAttributeImpl 

455 

456 def init_class_attribute(self, mapper: Mapper[Any]) -> None: 

457 self.is_class_level = True 

458 if not self.uselist or self.parent_property.direction not in ( 

459 interfaces.ONETOMANY, 

460 interfaces.MANYTOMANY, 

461 ): 

462 raise exc.InvalidRequestError( 

463 "On relationship %s, 'dynamic' loaders cannot be used with " 

464 "many-to-one/one-to-one relationships and/or " 

465 "uselist=False." % self.parent_property 

466 ) 

467 

468 strategies._register_attribute( # type: ignore[no-untyped-call] 

469 self.parent_property, 

470 mapper, 

471 useobject=True, 

472 impl_class=self.impl_class, 

473 target_mapper=self.parent_property.mapper, 

474 order_by=self.parent_property.order_by, 

475 query_class=self.parent_property.query_class, 

476 ) 

477 

478 

479class _DynamicCollectionAdapter: 

480 """simplified CollectionAdapter for internal API consistency""" 

481 

482 data: Collection[Any] 

483 

484 def __init__(self, data: Collection[Any]): 

485 self.data = data 

486 

487 def __iter__(self) -> Iterator[Any]: 

488 return iter(self.data) 

489 

490 def _reset_empty(self) -> None: 

491 pass 

492 

493 def __len__(self) -> int: 

494 return len(self.data) 

495 

496 def __bool__(self) -> bool: 

497 return True 

498 

499 

500class _AbstractCollectionWriter(Generic[_T]): 

501 """Virtual collection which includes append/remove methods that synchronize 

502 into the attribute event system. 

503 

504 """ 

505 

506 if not TYPE_CHECKING: 

507 __slots__ = () 

508 

509 instance: _T 

510 _from_obj: Tuple[FromClause, ...] 

511 

512 def __init__( 

513 self, attr: _WriteOnlyAttributeImpl, state: InstanceState[_T] 

514 ): 

515 instance = state.obj() 

516 if TYPE_CHECKING: 

517 assert instance 

518 self.instance = instance 

519 self.attr = attr 

520 

521 mapper = object_mapper(instance) 

522 prop = mapper._props[self.attr.key] 

523 

524 if prop.secondary is not None: 

525 # this is a hack right now. The Query only knows how to 

526 # make subsequent joins() without a given left-hand side 

527 # from self._from_obj[0]. We need to ensure prop.secondary 

528 # is in the FROM. So we purposely put the mapper selectable 

529 # in _from_obj[0] to ensure a user-defined join() later on 

530 # doesn't fail, and secondary is then in _from_obj[1]. 

531 

532 # note also, we are using the official ORM-annotated selectable 

533 # from __clause_element__(), see #7868 

534 self._from_obj = (prop.mapper.__clause_element__(), prop.secondary) 

535 else: 

536 self._from_obj = () 

537 

538 self._where_criteria = ( 

539 prop._with_parent(instance, alias_secondary=False), 

540 ) 

541 

542 if self.attr.order_by: 

543 self._order_by_clauses = self.attr.order_by 

544 else: 

545 self._order_by_clauses = () 

546 

547 def _add_all_impl(self, iterator: Iterable[_T]) -> None: 

548 for item in iterator: 

549 self.attr.append( 

550 attributes.instance_state(self.instance), 

551 attributes.instance_dict(self.instance), 

552 item, 

553 None, 

554 ) 

555 

556 def _remove_impl(self, item: _T) -> None: 

557 self.attr.remove( 

558 attributes.instance_state(self.instance), 

559 attributes.instance_dict(self.instance), 

560 item, 

561 None, 

562 ) 

563 

564 

565class WriteOnlyCollection(_AbstractCollectionWriter[_T]): 

566 """Write-only collection which can synchronize changes into the 

567 attribute event system. 

568 

569 The :class:`.WriteOnlyCollection` is used in a mapping by 

570 using the ``"write_only"`` lazy loading strategy with 

571 :func:`_orm.relationship`. For background on this configuration, 

572 see :ref:`write_only_relationship`. 

573 

574 .. versionadded:: 2.0 

575 

576 .. seealso:: 

577 

578 :ref:`write_only_relationship` 

579 

580 """ 

581 

582 __slots__ = ( 

583 "instance", 

584 "attr", 

585 "_where_criteria", 

586 "_from_obj", 

587 "_order_by_clauses", 

588 ) 

589 

590 def __iter__(self) -> NoReturn: 

591 raise TypeError( 

592 "WriteOnly collections don't support iteration in-place; " 

593 "to query for collection items, use the select() method to " 

594 "produce a SQL statement and execute it with session.scalars()." 

595 ) 

596 

597 def select(self) -> Select[_T]: 

598 """Produce a :class:`_sql.Select` construct that represents the 

599 rows within this instance-local :class:`_orm.WriteOnlyCollection`. 

600 

601 """ 

602 stmt = select(self.attr.target_mapper).where(*self._where_criteria) 

603 if self._from_obj: 

604 stmt = stmt.select_from(*self._from_obj) 

605 if self._order_by_clauses: 

606 stmt = stmt.order_by(*self._order_by_clauses) 

607 return stmt 

608 

609 def insert(self) -> Insert: 

610 """For one-to-many collections, produce a :class:`_dml.Insert` which 

611 will insert new rows in terms of this this instance-local 

612 :class:`_orm.WriteOnlyCollection`. 

613 

614 This construct is only supported for a :class:`_orm.Relationship` 

615 that does **not** include the :paramref:`_orm.relationship.secondary` 

616 parameter. For relationships that refer to a many-to-many table, 

617 use ordinary bulk insert techniques to produce new objects, then 

618 use :meth:`_orm.AbstractCollectionWriter.add_all` to associate them 

619 with the collection. 

620 

621 

622 """ 

623 

624 state = inspect(self.instance) 

625 mapper = state.mapper 

626 prop = mapper._props[self.attr.key] 

627 

628 if prop.direction is not RelationshipDirection.ONETOMANY: 

629 raise exc.InvalidRequestError( 

630 "Write only bulk INSERT only supported for one-to-many " 

631 "collections; for many-to-many, use a separate bulk " 

632 "INSERT along with add_all()." 

633 ) 

634 

635 dict_: Dict[str, Any] = {} 

636 

637 for l, r in prop.synchronize_pairs: 

638 fn = prop._get_attr_w_warn_on_none( 

639 mapper, 

640 state, 

641 state.dict, 

642 l, 

643 ) 

644 

645 dict_[r.key] = bindparam(None, callable_=fn) 

646 

647 return insert(self.attr.target_mapper).values(**dict_) 

648 

649 def update(self) -> Update: 

650 """Produce a :class:`_dml.Update` which will refer to rows in terms 

651 of this instance-local :class:`_orm.WriteOnlyCollection`. 

652 

653 """ 

654 return update(self.attr.target_mapper).where(*self._where_criteria) 

655 

656 def delete(self) -> Delete: 

657 """Produce a :class:`_dml.Delete` which will refer to rows in terms 

658 of this instance-local :class:`_orm.WriteOnlyCollection`. 

659 

660 """ 

661 return delete(self.attr.target_mapper).where(*self._where_criteria) 

662 

663 def add_all(self, iterator: Iterable[_T]) -> None: 

664 """Add an iterable of items to this :class:`_orm.WriteOnlyCollection`. 

665 

666 The given items will be persisted to the database in terms of 

667 the parent instance's collection on the next flush. 

668 

669 """ 

670 self._add_all_impl(iterator) 

671 

672 def add(self, item: _T) -> None: 

673 """Add an item to this :class:`_orm.WriteOnlyCollection`. 

674 

675 The given item will be persisted to the database in terms of 

676 the parent instance's collection on the next flush. 

677 

678 """ 

679 self._add_all_impl([item]) 

680 

681 def remove(self, item: _T) -> None: 

682 """Remove an item from this :class:`_orm.WriteOnlyCollection`. 

683 

684 The given item will be removed from the parent instance's collection on 

685 the next flush. 

686 

687 """ 

688 self._remove_impl(item)