Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1439 statements  

1# orm/session.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Provides the Session class and related utilities.""" 

9 

10from __future__ import annotations 

11 

12import contextlib 

13from enum import Enum 

14import itertools 

15import sys 

16import typing 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import Generic 

22from typing import Iterable 

23from typing import Iterator 

24from typing import List 

25from typing import NoReturn 

26from typing import Optional 

27from typing import overload 

28from typing import Sequence 

29from typing import Set 

30from typing import Tuple 

31from typing import Type 

32from typing import TYPE_CHECKING 

33from typing import TypeVar 

34from typing import Union 

35import weakref 

36 

37from . import attributes 

38from . import bulk_persistence 

39from . import context 

40from . import descriptor_props 

41from . import exc 

42from . import identity 

43from . import loading 

44from . import query 

45from . import state as statelib 

46from ._typing import _O 

47from ._typing import insp_is_mapper 

48from ._typing import is_composite_class 

49from ._typing import is_orm_option 

50from ._typing import is_user_defined_option 

51from .base import _class_to_mapper 

52from .base import _none_set 

53from .base import _state_mapper 

54from .base import instance_str 

55from .base import LoaderCallableStatus 

56from .base import object_mapper 

57from .base import object_state 

58from .base import PassiveFlag 

59from .base import state_str 

60from .context import FromStatement 

61from .context import ORMCompileState 

62from .identity import IdentityMap 

63from .query import Query 

64from .state import InstanceState 

65from .state_changes import _StateChange 

66from .state_changes import _StateChangeState 

67from .state_changes import _StateChangeStates 

68from .unitofwork import UOWTransaction 

69from .. import engine 

70from .. import exc as sa_exc 

71from .. import sql 

72from .. import util 

73from ..engine import Connection 

74from ..engine import Engine 

75from ..engine.util import TransactionalContext 

76from ..event import dispatcher 

77from ..event import EventTarget 

78from ..inspection import inspect 

79from ..inspection import Inspectable 

80from ..sql import coercions 

81from ..sql import dml 

82from ..sql import roles 

83from ..sql import Select 

84from ..sql import TableClause 

85from ..sql import visitors 

86from ..sql.base import _NoArg 

87from ..sql.base import CompileState 

88from ..sql.schema import Table 

89from ..sql.selectable import ForUpdateArg 

90from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL 

91from ..util import IdentitySet 

92from ..util.typing import Literal 

93from ..util.typing import Protocol 

94 

95if typing.TYPE_CHECKING: 

96 from ._typing import _EntityType 

97 from ._typing import _IdentityKeyType 

98 from ._typing import _InstanceDict 

99 from ._typing import OrmExecuteOptionsParameter 

100 from .interfaces import ORMOption 

101 from .interfaces import UserDefinedOption 

102 from .mapper import Mapper 

103 from .path_registry import PathRegistry 

104 from .query import RowReturningQuery 

105 from ..engine import CursorResult 

106 from ..engine import Result 

107 from ..engine import Row 

108 from ..engine import RowMapping 

109 from ..engine.base import Transaction 

110 from ..engine.base import TwoPhaseTransaction 

111 from ..engine.interfaces import _CoreAnyExecuteParams 

112 from ..engine.interfaces import _CoreSingleExecuteParams 

113 from ..engine.interfaces import _ExecuteOptions 

114 from ..engine.interfaces import CoreExecuteOptionsParameter 

115 from ..engine.result import ScalarResult 

116 from ..event import _InstanceLevelDispatch 

117 from ..sql._typing import _ColumnsClauseArgument 

118 from ..sql._typing import _InfoType 

119 from ..sql._typing import _T0 

120 from ..sql._typing import _T1 

121 from ..sql._typing import _T2 

122 from ..sql._typing import _T3 

123 from ..sql._typing import _T4 

124 from ..sql._typing import _T5 

125 from ..sql._typing import _T6 

126 from ..sql._typing import _T7 

127 from ..sql._typing import _TypedColumnClauseArgument as _TCCA 

128 from ..sql.base import Executable 

129 from ..sql.base import ExecutableOption 

130 from ..sql.dml import UpdateBase 

131 from ..sql.elements import ClauseElement 

132 from ..sql.roles import TypedColumnsClauseRole 

133 from ..sql.selectable import ForUpdateParameter 

134 from ..sql.selectable import TypedReturnsRows 

135 

136_T = TypeVar("_T", bound=Any) 

137 

138__all__ = [ 

139 "Session", 

140 "SessionTransaction", 

141 "sessionmaker", 

142 "ORMExecuteState", 

143 "close_all_sessions", 

144 "make_transient", 

145 "make_transient_to_detached", 

146 "object_session", 

147] 

148 

149_sessions: weakref.WeakValueDictionary[int, Session] = ( 

150 weakref.WeakValueDictionary() 

151) 

152"""Weak-referencing dictionary of :class:`.Session` objects. 

153""" 

154 

155statelib._sessions = _sessions 

156 

157_PKIdentityArgument = Union[Any, Tuple[Any, ...]] 

158 

159_BindArguments = Dict[str, Any] 

160 

161_EntityBindKey = Union[Type[_O], "Mapper[_O]"] 

162_SessionBindKey = Union[Type[Any], "Mapper[Any]", "TableClause", str] 

163_SessionBind = Union["Engine", "Connection"] 

164 

165JoinTransactionMode = Literal[ 

166 "conditional_savepoint", 

167 "rollback_only", 

168 "control_fully", 

169 "create_savepoint", 

170] 

171 

172 

173class _ConnectionCallableProto(Protocol): 

174 """a callable that returns a :class:`.Connection` given an instance. 

175 

176 This callable, when present on a :class:`.Session`, is called only from the 

177 ORM's persistence mechanism (i.e. the unit of work flush process) to allow 

178 for connection-per-instance schemes (i.e. horizontal sharding) to be used 

179 as persistence time. 

180 

181 This callable is not present on a plain :class:`.Session`, however 

182 is established when using the horizontal sharding extension. 

183 

184 """ 

185 

186 def __call__( 

187 self, 

188 mapper: Optional[Mapper[Any]] = None, 

189 instance: Optional[object] = None, 

190 **kw: Any, 

191 ) -> Connection: ... 

192 

193 

194def _state_session(state: InstanceState[Any]) -> Optional[Session]: 

195 """Given an :class:`.InstanceState`, return the :class:`.Session` 

196 associated, if any. 

197 """ 

198 return state.session 

199 

200 

201class _SessionClassMethods: 

202 """Class-level methods for :class:`.Session`, :class:`.sessionmaker`.""" 

203 

204 @classmethod 

205 @util.deprecated( 

206 "1.3", 

207 "The :meth:`.Session.close_all` method is deprecated and will be " 

208 "removed in a future release. Please refer to " 

209 ":func:`.session.close_all_sessions`.", 

210 ) 

211 def close_all(cls) -> None: 

212 """Close *all* sessions in memory.""" 

213 

214 close_all_sessions() 

215 

216 @classmethod 

217 @util.preload_module("sqlalchemy.orm.util") 

218 def identity_key( 

219 cls, 

220 class_: Optional[Type[Any]] = None, 

221 ident: Union[Any, Tuple[Any, ...]] = None, 

222 *, 

223 instance: Optional[Any] = None, 

224 row: Optional[Union[Row[Any], RowMapping]] = None, 

225 identity_token: Optional[Any] = None, 

226 ) -> _IdentityKeyType[Any]: 

227 """Return an identity key. 

228 

229 This is an alias of :func:`.util.identity_key`. 

230 

231 """ 

232 return util.preloaded.orm_util.identity_key( 

233 class_, 

234 ident, 

235 instance=instance, 

236 row=row, 

237 identity_token=identity_token, 

238 ) 

239 

240 @classmethod 

241 def object_session(cls, instance: object) -> Optional[Session]: 

242 """Return the :class:`.Session` to which an object belongs. 

243 

244 This is an alias of :func:`.object_session`. 

245 

246 """ 

247 

248 return object_session(instance) 

249 

250 

251class SessionTransactionState(_StateChangeState): 

252 ACTIVE = 1 

253 PREPARED = 2 

254 COMMITTED = 3 

255 DEACTIVE = 4 

256 CLOSED = 5 

257 PROVISIONING_CONNECTION = 6 

258 

259 

260# backwards compatibility 

261ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED, PROVISIONING_CONNECTION = tuple( 

262 SessionTransactionState 

263) 

264 

265 

266class ORMExecuteState(util.MemoizedSlots): 

267 """Represents a call to the :meth:`_orm.Session.execute` method, as passed 

268 to the :meth:`.SessionEvents.do_orm_execute` event hook. 

269 

270 .. versionadded:: 1.4 

271 

272 .. seealso:: 

273 

274 :ref:`session_execute_events` - top level documentation on how 

275 to use :meth:`_orm.SessionEvents.do_orm_execute` 

276 

277 """ 

278 

279 __slots__ = ( 

280 "session", 

281 "statement", 

282 "parameters", 

283 "execution_options", 

284 "local_execution_options", 

285 "bind_arguments", 

286 "identity_token", 

287 "_compile_state_cls", 

288 "_starting_event_idx", 

289 "_events_todo", 

290 "_update_execution_options", 

291 ) 

292 

293 session: Session 

294 """The :class:`_orm.Session` in use.""" 

295 

296 statement: Executable 

297 """The SQL statement being invoked. 

298 

299 For an ORM selection as would 

300 be retrieved from :class:`_orm.Query`, this is an instance of 

301 :class:`_sql.select` that was generated from the ORM query. 

302 """ 

303 

304 parameters: Optional[_CoreAnyExecuteParams] 

305 """Dictionary of parameters that was passed to 

306 :meth:`_orm.Session.execute`.""" 

307 

308 execution_options: _ExecuteOptions 

309 """The complete dictionary of current execution options. 

310 

311 This is a merge of the statement level options with the 

312 locally passed execution options. 

313 

314 .. seealso:: 

315 

316 :attr:`_orm.ORMExecuteState.local_execution_options` 

317 

318 :meth:`_sql.Executable.execution_options` 

319 

320 :ref:`orm_queryguide_execution_options` 

321 

322 """ 

323 

324 local_execution_options: _ExecuteOptions 

325 """Dictionary view of the execution options passed to the 

326 :meth:`.Session.execute` method. 

327 

328 This does not include options that may be associated with the statement 

329 being invoked. 

330 

331 .. seealso:: 

332 

333 :attr:`_orm.ORMExecuteState.execution_options` 

334 

335 """ 

336 

337 bind_arguments: _BindArguments 

338 """The dictionary passed as the 

339 :paramref:`_orm.Session.execute.bind_arguments` dictionary. 

340 

341 This dictionary may be used by extensions to :class:`_orm.Session` to pass 

342 arguments that will assist in determining amongst a set of database 

343 connections which one should be used to invoke this statement. 

344 

345 """ 

346 

347 _compile_state_cls: Optional[Type[ORMCompileState]] 

348 _starting_event_idx: int 

349 _events_todo: List[Any] 

350 _update_execution_options: Optional[_ExecuteOptions] 

351 

352 def __init__( 

353 self, 

354 session: Session, 

355 statement: Executable, 

356 parameters: Optional[_CoreAnyExecuteParams], 

357 execution_options: _ExecuteOptions, 

358 bind_arguments: _BindArguments, 

359 compile_state_cls: Optional[Type[ORMCompileState]], 

360 events_todo: List[_InstanceLevelDispatch[Session]], 

361 ): 

362 """Construct a new :class:`_orm.ORMExecuteState`. 

363 

364 this object is constructed internally. 

365 

366 """ 

367 self.session = session 

368 self.statement = statement 

369 self.parameters = parameters 

370 self.local_execution_options = execution_options 

371 self.execution_options = statement._execution_options.union( 

372 execution_options 

373 ) 

374 self.bind_arguments = bind_arguments 

375 self._compile_state_cls = compile_state_cls 

376 self._events_todo = list(events_todo) 

377 

378 def _remaining_events(self) -> List[_InstanceLevelDispatch[Session]]: 

379 return self._events_todo[self._starting_event_idx + 1 :] 

380 

381 def invoke_statement( 

382 self, 

383 statement: Optional[Executable] = None, 

384 params: Optional[_CoreAnyExecuteParams] = None, 

385 execution_options: Optional[OrmExecuteOptionsParameter] = None, 

386 bind_arguments: Optional[_BindArguments] = None, 

387 ) -> Result[Any]: 

388 """Execute the statement represented by this 

389 :class:`.ORMExecuteState`, without re-invoking events that have 

390 already proceeded. 

391 

392 This method essentially performs a re-entrant execution of the current 

393 statement for which the :meth:`.SessionEvents.do_orm_execute` event is 

394 being currently invoked. The use case for this is for event handlers 

395 that want to override how the ultimate 

396 :class:`_engine.Result` object is returned, such as for schemes that 

397 retrieve results from an offline cache or which concatenate results 

398 from multiple executions. 

399 

400 When the :class:`_engine.Result` object is returned by the actual 

401 handler function within :meth:`_orm.SessionEvents.do_orm_execute` and 

402 is propagated to the calling 

403 :meth:`_orm.Session.execute` method, the remainder of the 

404 :meth:`_orm.Session.execute` method is preempted and the 

405 :class:`_engine.Result` object is returned to the caller of 

406 :meth:`_orm.Session.execute` immediately. 

407 

408 :param statement: optional statement to be invoked, in place of the 

409 statement currently represented by :attr:`.ORMExecuteState.statement`. 

410 

411 :param params: optional dictionary of parameters or list of parameters 

412 which will be merged into the existing 

413 :attr:`.ORMExecuteState.parameters` of this :class:`.ORMExecuteState`. 

414 

415 .. versionchanged:: 2.0 a list of parameter dictionaries is accepted 

416 for executemany executions. 

417 

418 :param execution_options: optional dictionary of execution options 

419 will be merged into the existing 

420 :attr:`.ORMExecuteState.execution_options` of this 

421 :class:`.ORMExecuteState`. 

422 

423 :param bind_arguments: optional dictionary of bind_arguments 

424 which will be merged amongst the current 

425 :attr:`.ORMExecuteState.bind_arguments` 

426 of this :class:`.ORMExecuteState`. 

427 

428 :return: a :class:`_engine.Result` object with ORM-level results. 

429 

430 .. seealso:: 

431 

432 :ref:`do_orm_execute_re_executing` - background and examples on the 

433 appropriate usage of :meth:`_orm.ORMExecuteState.invoke_statement`. 

434 

435 

436 """ 

437 

438 if statement is None: 

439 statement = self.statement 

440 

441 _bind_arguments = dict(self.bind_arguments) 

442 if bind_arguments: 

443 _bind_arguments.update(bind_arguments) 

444 _bind_arguments["_sa_skip_events"] = True 

445 

446 _params: Optional[_CoreAnyExecuteParams] 

447 if params: 

448 if self.is_executemany: 

449 _params = [] 

450 exec_many_parameters = cast( 

451 "List[Dict[str, Any]]", self.parameters 

452 ) 

453 for _existing_params, _new_params in itertools.zip_longest( 

454 exec_many_parameters, 

455 cast("List[Dict[str, Any]]", params), 

456 ): 

457 if _existing_params is None or _new_params is None: 

458 raise sa_exc.InvalidRequestError( 

459 f"Can't apply executemany parameters to " 

460 f"statement; number of parameter sets passed to " 

461 f"Session.execute() ({len(exec_many_parameters)}) " 

462 f"does not match number of parameter sets given " 

463 f"to ORMExecuteState.invoke_statement() " 

464 f"({len(params)})" 

465 ) 

466 _existing_params = dict(_existing_params) 

467 _existing_params.update(_new_params) 

468 _params.append(_existing_params) 

469 else: 

470 _params = dict(cast("Dict[str, Any]", self.parameters)) 

471 _params.update(cast("Dict[str, Any]", params)) 

472 else: 

473 _params = self.parameters 

474 

475 _execution_options = self.local_execution_options 

476 if execution_options: 

477 _execution_options = _execution_options.union(execution_options) 

478 

479 return self.session._execute_internal( 

480 statement, 

481 _params, 

482 execution_options=_execution_options, 

483 bind_arguments=_bind_arguments, 

484 _parent_execute_state=self, 

485 ) 

486 

487 @property 

488 def bind_mapper(self) -> Optional[Mapper[Any]]: 

489 """Return the :class:`_orm.Mapper` that is the primary "bind" mapper. 

490 

491 For an :class:`_orm.ORMExecuteState` object invoking an ORM 

492 statement, that is, the :attr:`_orm.ORMExecuteState.is_orm_statement` 

493 attribute is ``True``, this attribute will return the 

494 :class:`_orm.Mapper` that is considered to be the "primary" mapper 

495 of the statement. The term "bind mapper" refers to the fact that 

496 a :class:`_orm.Session` object may be "bound" to multiple 

497 :class:`_engine.Engine` objects keyed to mapped classes, and the 

498 "bind mapper" determines which of those :class:`_engine.Engine` objects 

499 would be selected. 

500 

501 For a statement that is invoked against a single mapped class, 

502 :attr:`_orm.ORMExecuteState.bind_mapper` is intended to be a reliable 

503 way of getting this mapper. 

504 

505 .. versionadded:: 1.4.0b2 

506 

507 .. seealso:: 

508 

509 :attr:`_orm.ORMExecuteState.all_mappers` 

510 

511 

512 """ 

513 mp: Optional[Mapper[Any]] = self.bind_arguments.get("mapper", None) 

514 return mp 

515 

516 @property 

517 def all_mappers(self) -> Sequence[Mapper[Any]]: 

518 """Return a sequence of all :class:`_orm.Mapper` objects that are 

519 involved at the top level of this statement. 

520 

521 By "top level" we mean those :class:`_orm.Mapper` objects that would 

522 be represented in the result set rows for a :func:`_sql.select` 

523 query, or for a :func:`_dml.update` or :func:`_dml.delete` query, 

524 the mapper that is the main subject of the UPDATE or DELETE. 

525 

526 .. versionadded:: 1.4.0b2 

527 

528 .. seealso:: 

529 

530 :attr:`_orm.ORMExecuteState.bind_mapper` 

531 

532 

533 

534 """ 

535 if not self.is_orm_statement: 

536 return [] 

537 elif isinstance(self.statement, (Select, FromStatement)): 

538 result = [] 

539 seen = set() 

540 for d in self.statement.column_descriptions: 

541 ent = d["entity"] 

542 if ent: 

543 insp = inspect(ent, raiseerr=False) 

544 if insp and insp.mapper and insp.mapper not in seen: 

545 seen.add(insp.mapper) 

546 result.append(insp.mapper) 

547 return result 

548 elif self.statement.is_dml and self.bind_mapper: 

549 return [self.bind_mapper] 

550 else: 

551 return [] 

552 

553 @property 

554 def is_orm_statement(self) -> bool: 

555 """return True if the operation is an ORM statement. 

556 

557 This indicates that the select(), insert(), update(), or delete() 

558 being invoked contains ORM entities as subjects. For a statement 

559 that does not have ORM entities and instead refers only to 

560 :class:`.Table` metadata, it is invoked as a Core SQL statement 

561 and no ORM-level automation takes place. 

562 

563 """ 

564 return self._compile_state_cls is not None 

565 

566 @property 

567 def is_executemany(self) -> bool: 

568 """return True if the parameters are a multi-element list of 

569 dictionaries with more than one dictionary. 

570 

571 .. versionadded:: 2.0 

572 

573 """ 

574 return isinstance(self.parameters, list) 

575 

576 @property 

577 def is_select(self) -> bool: 

578 """return True if this is a SELECT operation. 

579 

580 .. versionchanged:: 2.0.30 - the attribute is also True for a 

581 :meth:`_sql.Select.from_statement` construct that is itself against 

582 a :class:`_sql.Select` construct, such as 

583 ``select(Entity).from_statement(select(..))`` 

584 

585 """ 

586 return self.statement.is_select 

587 

588 @property 

589 def is_from_statement(self) -> bool: 

590 """return True if this operation is a 

591 :meth:`_sql.Select.from_statement` operation. 

592 

593 This is independent from :attr:`_orm.ORMExecuteState.is_select`, as a 

594 ``select().from_statement()`` construct can be used with 

595 INSERT/UPDATE/DELETE RETURNING types of statements as well. 

596 :attr:`_orm.ORMExecuteState.is_select` will only be set if the 

597 :meth:`_sql.Select.from_statement` is itself against a 

598 :class:`_sql.Select` construct. 

599 

600 .. versionadded:: 2.0.30 

601 

602 """ 

603 return self.statement.is_from_statement 

604 

605 @property 

606 def is_insert(self) -> bool: 

607 """return True if this is an INSERT operation. 

608 

609 .. versionchanged:: 2.0.30 - the attribute is also True for a 

610 :meth:`_sql.Select.from_statement` construct that is itself against 

611 a :class:`_sql.Insert` construct, such as 

612 ``select(Entity).from_statement(insert(..))`` 

613 

614 """ 

615 return self.statement.is_dml and self.statement.is_insert 

616 

617 @property 

618 def is_update(self) -> bool: 

619 """return True if this is an UPDATE operation. 

620 

621 .. versionchanged:: 2.0.30 - the attribute is also True for a 

622 :meth:`_sql.Select.from_statement` construct that is itself against 

623 a :class:`_sql.Update` construct, such as 

624 ``select(Entity).from_statement(update(..))`` 

625 

626 """ 

627 return self.statement.is_dml and self.statement.is_update 

628 

629 @property 

630 def is_delete(self) -> bool: 

631 """return True if this is a DELETE operation. 

632 

633 .. versionchanged:: 2.0.30 - the attribute is also True for a 

634 :meth:`_sql.Select.from_statement` construct that is itself against 

635 a :class:`_sql.Delete` construct, such as 

636 ``select(Entity).from_statement(delete(..))`` 

637 

638 """ 

639 return self.statement.is_dml and self.statement.is_delete 

640 

641 @property 

642 def _is_crud(self) -> bool: 

643 return isinstance(self.statement, (dml.Update, dml.Delete)) 

644 

645 def update_execution_options(self, **opts: Any) -> None: 

646 """Update the local execution options with new values.""" 

647 self.local_execution_options = self.local_execution_options.union(opts) 

648 

649 def _orm_compile_options( 

650 self, 

651 ) -> Optional[ 

652 Union[ 

653 context.ORMCompileState.default_compile_options, 

654 Type[context.ORMCompileState.default_compile_options], 

655 ] 

656 ]: 

657 if not self.is_select: 

658 return None 

659 try: 

660 opts = self.statement._compile_options 

661 except AttributeError: 

662 return None 

663 

664 if opts is not None and opts.isinstance( 

665 context.ORMCompileState.default_compile_options 

666 ): 

667 return opts # type: ignore 

668 else: 

669 return None 

670 

671 @property 

672 def lazy_loaded_from(self) -> Optional[InstanceState[Any]]: 

673 """An :class:`.InstanceState` that is using this statement execution 

674 for a lazy load operation. 

675 

676 The primary rationale for this attribute is to support the horizontal 

677 sharding extension, where it is available within specific query 

678 execution time hooks created by this extension. To that end, the 

679 attribute is only intended to be meaningful at **query execution 

680 time**, and importantly not any time prior to that, including query 

681 compilation time. 

682 

683 """ 

684 return self.load_options._lazy_loaded_from 

685 

686 @property 

687 def loader_strategy_path(self) -> Optional[PathRegistry]: 

688 """Return the :class:`.PathRegistry` for the current load path. 

689 

690 This object represents the "path" in a query along relationships 

691 when a particular object or collection is being loaded. 

692 

693 """ 

694 opts = self._orm_compile_options() 

695 if opts is not None: 

696 return opts._current_path 

697 else: 

698 return None 

699 

700 @property 

701 def is_column_load(self) -> bool: 

702 """Return True if the operation is refreshing column-oriented 

703 attributes on an existing ORM object. 

704 

705 This occurs during operations such as :meth:`_orm.Session.refresh`, 

706 as well as when an attribute deferred by :func:`_orm.defer` is 

707 being loaded, or an attribute that was expired either directly 

708 by :meth:`_orm.Session.expire` or via a commit operation is being 

709 loaded. 

710 

711 Handlers will very likely not want to add any options to queries 

712 when such an operation is occurring as the query should be a straight 

713 primary key fetch which should not have any additional WHERE criteria, 

714 and loader options travelling with the instance 

715 will have already been added to the query. 

716 

717 .. versionadded:: 1.4.0b2 

718 

719 .. seealso:: 

720 

721 :attr:`_orm.ORMExecuteState.is_relationship_load` 

722 

723 """ 

724 opts = self._orm_compile_options() 

725 return opts is not None and opts._for_refresh_state 

726 

727 @property 

728 def is_relationship_load(self) -> bool: 

729 """Return True if this load is loading objects on behalf of a 

730 relationship. 

731 

732 This means, the loader in effect is either a LazyLoader, 

733 SelectInLoader, SubqueryLoader, or similar, and the entire 

734 SELECT statement being emitted is on behalf of a relationship 

735 load. 

736 

737 Handlers will very likely not want to add any options to queries 

738 when such an operation is occurring, as loader options are already 

739 capable of being propagated to relationship loaders and should 

740 be already present. 

741 

742 .. seealso:: 

743 

744 :attr:`_orm.ORMExecuteState.is_column_load` 

745 

746 """ 

747 opts = self._orm_compile_options() 

748 if opts is None: 

749 return False 

750 path = self.loader_strategy_path 

751 return path is not None and not path.is_root 

752 

753 @property 

754 def load_options( 

755 self, 

756 ) -> Union[ 

757 context.QueryContext.default_load_options, 

758 Type[context.QueryContext.default_load_options], 

759 ]: 

760 """Return the load_options that will be used for this execution.""" 

761 

762 if not self.is_select: 

763 raise sa_exc.InvalidRequestError( 

764 "This ORM execution is not against a SELECT statement " 

765 "so there are no load options." 

766 ) 

767 

768 lo: Union[ 

769 context.QueryContext.default_load_options, 

770 Type[context.QueryContext.default_load_options], 

771 ] = self.execution_options.get( 

772 "_sa_orm_load_options", context.QueryContext.default_load_options 

773 ) 

774 return lo 

775 

776 @property 

777 def update_delete_options( 

778 self, 

779 ) -> Union[ 

780 bulk_persistence.BulkUDCompileState.default_update_options, 

781 Type[bulk_persistence.BulkUDCompileState.default_update_options], 

782 ]: 

783 """Return the update_delete_options that will be used for this 

784 execution.""" 

785 

786 if not self._is_crud: 

787 raise sa_exc.InvalidRequestError( 

788 "This ORM execution is not against an UPDATE or DELETE " 

789 "statement so there are no update options." 

790 ) 

791 uo: Union[ 

792 bulk_persistence.BulkUDCompileState.default_update_options, 

793 Type[bulk_persistence.BulkUDCompileState.default_update_options], 

794 ] = self.execution_options.get( 

795 "_sa_orm_update_options", 

796 bulk_persistence.BulkUDCompileState.default_update_options, 

797 ) 

798 return uo 

799 

800 @property 

801 def _non_compile_orm_options(self) -> Sequence[ORMOption]: 

802 return [ 

803 opt 

804 for opt in self.statement._with_options 

805 if is_orm_option(opt) and not opt._is_compile_state 

806 ] 

807 

808 @property 

809 def user_defined_options(self) -> Sequence[UserDefinedOption]: 

810 """The sequence of :class:`.UserDefinedOptions` that have been 

811 associated with the statement being invoked. 

812 

813 """ 

814 return [ 

815 opt 

816 for opt in self.statement._with_options 

817 if is_user_defined_option(opt) 

818 ] 

819 

820 

821class SessionTransactionOrigin(Enum): 

822 """indicates the origin of a :class:`.SessionTransaction`. 

823 

824 This enumeration is present on the 

825 :attr:`.SessionTransaction.origin` attribute of any 

826 :class:`.SessionTransaction` object. 

827 

828 .. versionadded:: 2.0 

829 

830 """ 

831 

832 AUTOBEGIN = 0 

833 """transaction were started by autobegin""" 

834 

835 BEGIN = 1 

836 """transaction were started by calling :meth:`_orm.Session.begin`""" 

837 

838 BEGIN_NESTED = 2 

839 """tranaction were started by :meth:`_orm.Session.begin_nested`""" 

840 

841 SUBTRANSACTION = 3 

842 """transaction is an internal "subtransaction" """ 

843 

844 

845class SessionTransaction(_StateChange, TransactionalContext): 

846 """A :class:`.Session`-level transaction. 

847 

848 :class:`.SessionTransaction` is produced from the 

849 :meth:`_orm.Session.begin` 

850 and :meth:`_orm.Session.begin_nested` methods. It's largely an internal 

851 object that in modern use provides a context manager for session 

852 transactions. 

853 

854 Documentation on interacting with :class:`_orm.SessionTransaction` is 

855 at: :ref:`unitofwork_transaction`. 

856 

857 

858 .. versionchanged:: 1.4 The scoping and API methods to work with the 

859 :class:`_orm.SessionTransaction` object directly have been simplified. 

860 

861 .. seealso:: 

862 

863 :ref:`unitofwork_transaction` 

864 

865 :meth:`.Session.begin` 

866 

867 :meth:`.Session.begin_nested` 

868 

869 :meth:`.Session.rollback` 

870 

871 :meth:`.Session.commit` 

872 

873 :meth:`.Session.in_transaction` 

874 

875 :meth:`.Session.in_nested_transaction` 

876 

877 :meth:`.Session.get_transaction` 

878 

879 :meth:`.Session.get_nested_transaction` 

880 

881 

882 """ 

883 

884 _rollback_exception: Optional[BaseException] = None 

885 

886 _connections: Dict[ 

887 Union[Engine, Connection], Tuple[Connection, Transaction, bool, bool] 

888 ] 

889 session: Session 

890 _parent: Optional[SessionTransaction] 

891 

892 _state: SessionTransactionState 

893 

894 _new: weakref.WeakKeyDictionary[InstanceState[Any], object] 

895 _deleted: weakref.WeakKeyDictionary[InstanceState[Any], object] 

896 _dirty: weakref.WeakKeyDictionary[InstanceState[Any], object] 

897 _key_switches: weakref.WeakKeyDictionary[ 

898 InstanceState[Any], Tuple[Any, Any] 

899 ] 

900 

901 origin: SessionTransactionOrigin 

902 """Origin of this :class:`_orm.SessionTransaction`. 

903 

904 Refers to a :class:`.SessionTransactionOrigin` instance which is an 

905 enumeration indicating the source event that led to constructing 

906 this :class:`_orm.SessionTransaction`. 

907 

908 .. versionadded:: 2.0 

909 

910 """ 

911 

912 nested: bool = False 

913 """Indicates if this is a nested, or SAVEPOINT, transaction. 

914 

915 When :attr:`.SessionTransaction.nested` is True, it is expected 

916 that :attr:`.SessionTransaction.parent` will be present as well, 

917 linking to the enclosing :class:`.SessionTransaction`. 

918 

919 .. seealso:: 

920 

921 :attr:`.SessionTransaction.origin` 

922 

923 """ 

924 

925 def __init__( 

926 self, 

927 session: Session, 

928 origin: SessionTransactionOrigin, 

929 parent: Optional[SessionTransaction] = None, 

930 ): 

931 TransactionalContext._trans_ctx_check(session) 

932 

933 self.session = session 

934 self._connections = {} 

935 self._parent = parent 

936 self.nested = nested = origin is SessionTransactionOrigin.BEGIN_NESTED 

937 self.origin = origin 

938 

939 if session._close_state is _SessionCloseState.CLOSED: 

940 raise sa_exc.InvalidRequestError( 

941 "This Session has been permanently closed and is unable " 

942 "to handle any more transaction requests." 

943 ) 

944 

945 if nested: 

946 if not parent: 

947 raise sa_exc.InvalidRequestError( 

948 "Can't start a SAVEPOINT transaction when no existing " 

949 "transaction is in progress" 

950 ) 

951 

952 self._previous_nested_transaction = session._nested_transaction 

953 elif origin is SessionTransactionOrigin.SUBTRANSACTION: 

954 assert parent is not None 

955 else: 

956 assert parent is None 

957 

958 self._state = SessionTransactionState.ACTIVE 

959 

960 self._take_snapshot() 

961 

962 # make sure transaction is assigned before we call the 

963 # dispatch 

964 self.session._transaction = self 

965 

966 self.session.dispatch.after_transaction_create(self.session, self) 

967 

968 def _raise_for_prerequisite_state( 

969 self, operation_name: str, state: _StateChangeState 

970 ) -> NoReturn: 

971 if state is SessionTransactionState.DEACTIVE: 

972 if self._rollback_exception: 

973 raise sa_exc.PendingRollbackError( 

974 "This Session's transaction has been rolled back " 

975 "due to a previous exception during flush." 

976 " To begin a new transaction with this Session, " 

977 "first issue Session.rollback()." 

978 f" Original exception was: {self._rollback_exception}", 

979 code="7s2a", 

980 ) 

981 else: 

982 raise sa_exc.InvalidRequestError( 

983 "This session is in 'inactive' state, due to the " 

984 "SQL transaction being rolled back; no further SQL " 

985 "can be emitted within this transaction." 

986 ) 

987 elif state is SessionTransactionState.CLOSED: 

988 raise sa_exc.ResourceClosedError("This transaction is closed") 

989 elif state is SessionTransactionState.PROVISIONING_CONNECTION: 

990 raise sa_exc.InvalidRequestError( 

991 "This session is provisioning a new connection; concurrent " 

992 "operations are not permitted", 

993 code="isce", 

994 ) 

995 else: 

996 raise sa_exc.InvalidRequestError( 

997 f"This session is in '{state.name.lower()}' state; no " 

998 "further SQL can be emitted within this transaction." 

999 ) 

1000 

1001 @property 

1002 def parent(self) -> Optional[SessionTransaction]: 

1003 """The parent :class:`.SessionTransaction` of this 

1004 :class:`.SessionTransaction`. 

1005 

1006 If this attribute is ``None``, indicates this 

1007 :class:`.SessionTransaction` is at the top of the stack, and 

1008 corresponds to a real "COMMIT"/"ROLLBACK" 

1009 block. If non-``None``, then this is either a "subtransaction" 

1010 (an internal marker object used by the flush process) or a 

1011 "nested" / SAVEPOINT transaction. If the 

1012 :attr:`.SessionTransaction.nested` attribute is ``True``, then 

1013 this is a SAVEPOINT, and if ``False``, indicates this a subtransaction. 

1014 

1015 """ 

1016 return self._parent 

1017 

1018 @property 

1019 def is_active(self) -> bool: 

1020 return ( 

1021 self.session is not None 

1022 and self._state is SessionTransactionState.ACTIVE 

1023 ) 

1024 

1025 @property 

1026 def _is_transaction_boundary(self) -> bool: 

1027 return self.nested or not self._parent 

1028 

1029 @_StateChange.declare_states( 

1030 (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE 

1031 ) 

1032 def connection( 

1033 self, 

1034 bindkey: Optional[Mapper[Any]], 

1035 execution_options: Optional[_ExecuteOptions] = None, 

1036 **kwargs: Any, 

1037 ) -> Connection: 

1038 bind = self.session.get_bind(bindkey, **kwargs) 

1039 return self._connection_for_bind(bind, execution_options) 

1040 

1041 @_StateChange.declare_states( 

1042 (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE 

1043 ) 

1044 def _begin(self, nested: bool = False) -> SessionTransaction: 

1045 return SessionTransaction( 

1046 self.session, 

1047 ( 

1048 SessionTransactionOrigin.BEGIN_NESTED 

1049 if nested 

1050 else SessionTransactionOrigin.SUBTRANSACTION 

1051 ), 

1052 self, 

1053 ) 

1054 

1055 def _iterate_self_and_parents( 

1056 self, upto: Optional[SessionTransaction] = None 

1057 ) -> Iterable[SessionTransaction]: 

1058 current = self 

1059 result: Tuple[SessionTransaction, ...] = () 

1060 while current: 

1061 result += (current,) 

1062 if current._parent is upto: 

1063 break 

1064 elif current._parent is None: 

1065 raise sa_exc.InvalidRequestError( 

1066 "Transaction %s is not on the active transaction list" 

1067 % (upto) 

1068 ) 

1069 else: 

1070 current = current._parent 

1071 

1072 return result 

1073 

1074 def _take_snapshot(self) -> None: 

1075 if not self._is_transaction_boundary: 

1076 parent = self._parent 

1077 assert parent is not None 

1078 self._new = parent._new 

1079 self._deleted = parent._deleted 

1080 self._dirty = parent._dirty 

1081 self._key_switches = parent._key_switches 

1082 return 

1083 

1084 is_begin = self.origin in ( 

1085 SessionTransactionOrigin.BEGIN, 

1086 SessionTransactionOrigin.AUTOBEGIN, 

1087 ) 

1088 if not is_begin and not self.session._flushing: 

1089 self.session.flush() 

1090 

1091 self._new = weakref.WeakKeyDictionary() 

1092 self._deleted = weakref.WeakKeyDictionary() 

1093 self._dirty = weakref.WeakKeyDictionary() 

1094 self._key_switches = weakref.WeakKeyDictionary() 

1095 

1096 def _restore_snapshot(self, dirty_only: bool = False) -> None: 

1097 """Restore the restoration state taken before a transaction began. 

1098 

1099 Corresponds to a rollback. 

1100 

1101 """ 

1102 assert self._is_transaction_boundary 

1103 

1104 to_expunge = set(self._new).union(self.session._new) 

1105 self.session._expunge_states(to_expunge, to_transient=True) 

1106 

1107 for s, (oldkey, newkey) in self._key_switches.items(): 

1108 # we probably can do this conditionally based on 

1109 # if we expunged or not, but safe_discard does that anyway 

1110 self.session.identity_map.safe_discard(s) 

1111 

1112 # restore the old key 

1113 s.key = oldkey 

1114 

1115 # now restore the object, but only if we didn't expunge 

1116 if s not in to_expunge: 

1117 self.session.identity_map.replace(s) 

1118 

1119 for s in set(self._deleted).union(self.session._deleted): 

1120 self.session._update_impl(s, revert_deletion=True) 

1121 

1122 assert not self.session._deleted 

1123 

1124 for s in self.session.identity_map.all_states(): 

1125 if not dirty_only or s.modified or s in self._dirty: 

1126 s._expire(s.dict, self.session.identity_map._modified) 

1127 

1128 def _remove_snapshot(self) -> None: 

1129 """Remove the restoration state taken before a transaction began. 

1130 

1131 Corresponds to a commit. 

1132 

1133 """ 

1134 assert self._is_transaction_boundary 

1135 

1136 if not self.nested and self.session.expire_on_commit: 

1137 for s in self.session.identity_map.all_states(): 

1138 s._expire(s.dict, self.session.identity_map._modified) 

1139 

1140 statelib.InstanceState._detach_states( 

1141 list(self._deleted), self.session 

1142 ) 

1143 self._deleted.clear() 

1144 elif self.nested: 

1145 parent = self._parent 

1146 assert parent is not None 

1147 parent._new.update(self._new) 

1148 parent._dirty.update(self._dirty) 

1149 parent._deleted.update(self._deleted) 

1150 parent._key_switches.update(self._key_switches) 

1151 

1152 @_StateChange.declare_states( 

1153 (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE 

1154 ) 

1155 def _connection_for_bind( 

1156 self, 

1157 bind: _SessionBind, 

1158 execution_options: Optional[CoreExecuteOptionsParameter], 

1159 ) -> Connection: 

1160 if bind in self._connections: 

1161 if execution_options: 

1162 util.warn( 

1163 "Connection is already established for the " 

1164 "given bind; execution_options ignored" 

1165 ) 

1166 return self._connections[bind][0] 

1167 

1168 self._state = SessionTransactionState.PROVISIONING_CONNECTION 

1169 

1170 local_connect = False 

1171 should_commit = True 

1172 

1173 try: 

1174 if self._parent: 

1175 conn = self._parent._connection_for_bind( 

1176 bind, execution_options 

1177 ) 

1178 if not self.nested: 

1179 return conn 

1180 else: 

1181 if isinstance(bind, engine.Connection): 

1182 conn = bind 

1183 if conn.engine in self._connections: 

1184 raise sa_exc.InvalidRequestError( 

1185 "Session already has a Connection associated " 

1186 "for the given Connection's Engine" 

1187 ) 

1188 else: 

1189 conn = bind.connect() 

1190 local_connect = True 

1191 

1192 try: 

1193 if execution_options: 

1194 conn = conn.execution_options(**execution_options) 

1195 

1196 transaction: Transaction 

1197 if self.session.twophase and self._parent is None: 

1198 # TODO: shouldn't we only be here if not 

1199 # conn.in_transaction() ? 

1200 # if twophase is set and conn.in_transaction(), validate 

1201 # that it is in fact twophase. 

1202 transaction = conn.begin_twophase() 

1203 elif self.nested: 

1204 transaction = conn.begin_nested() 

1205 elif conn.in_transaction(): 

1206 join_transaction_mode = self.session.join_transaction_mode 

1207 

1208 if join_transaction_mode == "conditional_savepoint": 

1209 if conn.in_nested_transaction(): 

1210 join_transaction_mode = "create_savepoint" 

1211 else: 

1212 join_transaction_mode = "rollback_only" 

1213 

1214 if local_connect: 

1215 util.warn( 

1216 "The engine provided as bind produced a " 

1217 "connection that is already in a transaction. " 

1218 "This is usually caused by a core event, " 

1219 "such as 'engine_connect', that has left a " 

1220 "transaction open. The effective join " 

1221 "transaction mode used by this session is " 

1222 f"{join_transaction_mode!r}. To silence this " 

1223 "warning, do not leave transactions open" 

1224 ) 

1225 if join_transaction_mode in ( 

1226 "control_fully", 

1227 "rollback_only", 

1228 ): 

1229 if conn.in_nested_transaction(): 

1230 transaction = ( 

1231 conn._get_required_nested_transaction() 

1232 ) 

1233 else: 

1234 transaction = conn._get_required_transaction() 

1235 if join_transaction_mode == "rollback_only": 

1236 should_commit = False 

1237 elif join_transaction_mode == "create_savepoint": 

1238 transaction = conn.begin_nested() 

1239 else: 

1240 assert False, join_transaction_mode 

1241 else: 

1242 transaction = conn.begin() 

1243 except: 

1244 # connection will not not be associated with this Session; 

1245 # close it immediately so that it isn't closed under GC 

1246 if local_connect: 

1247 conn.close() 

1248 raise 

1249 else: 

1250 bind_is_connection = isinstance(bind, engine.Connection) 

1251 

1252 self._connections[conn] = self._connections[conn.engine] = ( 

1253 conn, 

1254 transaction, 

1255 should_commit, 

1256 not bind_is_connection, 

1257 ) 

1258 self.session.dispatch.after_begin(self.session, self, conn) 

1259 return conn 

1260 finally: 

1261 self._state = SessionTransactionState.ACTIVE 

1262 

1263 def prepare(self) -> None: 

1264 if self._parent is not None or not self.session.twophase: 

1265 raise sa_exc.InvalidRequestError( 

1266 "'twophase' mode not enabled, or not root transaction; " 

1267 "can't prepare." 

1268 ) 

1269 self._prepare_impl() 

1270 

1271 @_StateChange.declare_states( 

1272 (SessionTransactionState.ACTIVE,), SessionTransactionState.PREPARED 

1273 ) 

1274 def _prepare_impl(self) -> None: 

1275 if self._parent is None or self.nested: 

1276 self.session.dispatch.before_commit(self.session) 

1277 

1278 stx = self.session._transaction 

1279 assert stx is not None 

1280 if stx is not self: 

1281 for subtransaction in stx._iterate_self_and_parents(upto=self): 

1282 subtransaction.commit() 

1283 

1284 if not self.session._flushing: 

1285 for _flush_guard in range(100): 

1286 if self.session._is_clean(): 

1287 break 

1288 self.session.flush() 

1289 else: 

1290 raise exc.FlushError( 

1291 "Over 100 subsequent flushes have occurred within " 

1292 "session.commit() - is an after_flush() hook " 

1293 "creating new objects?" 

1294 ) 

1295 

1296 if self._parent is None and self.session.twophase: 

1297 try: 

1298 for t in set(self._connections.values()): 

1299 cast("TwoPhaseTransaction", t[1]).prepare() 

1300 except: 

1301 with util.safe_reraise(): 

1302 self.rollback() 

1303 

1304 self._state = SessionTransactionState.PREPARED 

1305 

1306 @_StateChange.declare_states( 

1307 (SessionTransactionState.ACTIVE, SessionTransactionState.PREPARED), 

1308 SessionTransactionState.CLOSED, 

1309 ) 

1310 def commit(self, _to_root: bool = False) -> None: 

1311 if self._state is not SessionTransactionState.PREPARED: 

1312 with self._expect_state(SessionTransactionState.PREPARED): 

1313 self._prepare_impl() 

1314 

1315 if self._parent is None or self.nested: 

1316 for conn, trans, should_commit, autoclose in set( 

1317 self._connections.values() 

1318 ): 

1319 if should_commit: 

1320 trans.commit() 

1321 

1322 self._state = SessionTransactionState.COMMITTED 

1323 self.session.dispatch.after_commit(self.session) 

1324 

1325 self._remove_snapshot() 

1326 

1327 with self._expect_state(SessionTransactionState.CLOSED): 

1328 self.close() 

1329 

1330 if _to_root and self._parent: 

1331 self._parent.commit(_to_root=True) 

1332 

1333 @_StateChange.declare_states( 

1334 ( 

1335 SessionTransactionState.ACTIVE, 

1336 SessionTransactionState.DEACTIVE, 

1337 SessionTransactionState.PREPARED, 

1338 ), 

1339 SessionTransactionState.CLOSED, 

1340 ) 

1341 def rollback( 

1342 self, _capture_exception: bool = False, _to_root: bool = False 

1343 ) -> None: 

1344 stx = self.session._transaction 

1345 assert stx is not None 

1346 if stx is not self: 

1347 for subtransaction in stx._iterate_self_and_parents(upto=self): 

1348 subtransaction.close() 

1349 

1350 boundary = self 

1351 rollback_err = None 

1352 if self._state in ( 

1353 SessionTransactionState.ACTIVE, 

1354 SessionTransactionState.PREPARED, 

1355 ): 

1356 for transaction in self._iterate_self_and_parents(): 

1357 if transaction._parent is None or transaction.nested: 

1358 try: 

1359 for t in set(transaction._connections.values()): 

1360 t[1].rollback() 

1361 

1362 transaction._state = SessionTransactionState.DEACTIVE 

1363 self.session.dispatch.after_rollback(self.session) 

1364 except: 

1365 rollback_err = sys.exc_info() 

1366 finally: 

1367 transaction._state = SessionTransactionState.DEACTIVE 

1368 transaction._restore_snapshot( 

1369 dirty_only=transaction.nested 

1370 ) 

1371 boundary = transaction 

1372 break 

1373 else: 

1374 transaction._state = SessionTransactionState.DEACTIVE 

1375 

1376 sess = self.session 

1377 

1378 if not rollback_err and not sess._is_clean(): 

1379 # if items were added, deleted, or mutated 

1380 # here, we need to re-restore the snapshot 

1381 util.warn( 

1382 "Session's state has been changed on " 

1383 "a non-active transaction - this state " 

1384 "will be discarded." 

1385 ) 

1386 boundary._restore_snapshot(dirty_only=boundary.nested) 

1387 

1388 with self._expect_state(SessionTransactionState.CLOSED): 

1389 self.close() 

1390 

1391 if self._parent and _capture_exception: 

1392 self._parent._rollback_exception = sys.exc_info()[1] 

1393 

1394 if rollback_err and rollback_err[1]: 

1395 raise rollback_err[1].with_traceback(rollback_err[2]) 

1396 

1397 sess.dispatch.after_soft_rollback(sess, self) 

1398 

1399 if _to_root and self._parent: 

1400 self._parent.rollback(_to_root=True) 

1401 

1402 @_StateChange.declare_states( 

1403 _StateChangeStates.ANY, SessionTransactionState.CLOSED 

1404 ) 

1405 def close(self, invalidate: bool = False) -> None: 

1406 if self.nested: 

1407 self.session._nested_transaction = ( 

1408 self._previous_nested_transaction 

1409 ) 

1410 

1411 self.session._transaction = self._parent 

1412 

1413 for connection, transaction, should_commit, autoclose in set( 

1414 self._connections.values() 

1415 ): 

1416 if invalidate and self._parent is None: 

1417 connection.invalidate() 

1418 if should_commit and transaction.is_active: 

1419 transaction.close() 

1420 if autoclose and self._parent is None: 

1421 connection.close() 

1422 

1423 self._state = SessionTransactionState.CLOSED 

1424 sess = self.session 

1425 

1426 # TODO: these two None sets were historically after the 

1427 # event hook below, and in 2.0 I changed it this way for some reason, 

1428 # and I remember there being a reason, but not what it was. 

1429 # Why do we need to get rid of them at all? test_memusage::CycleTest 

1430 # passes with these commented out. 

1431 # self.session = None # type: ignore 

1432 # self._connections = None # type: ignore 

1433 

1434 sess.dispatch.after_transaction_end(sess, self) 

1435 

1436 def _get_subject(self) -> Session: 

1437 return self.session 

1438 

1439 def _transaction_is_active(self) -> bool: 

1440 return self._state is SessionTransactionState.ACTIVE 

1441 

1442 def _transaction_is_closed(self) -> bool: 

1443 return self._state is SessionTransactionState.CLOSED 

1444 

1445 def _rollback_can_be_called(self) -> bool: 

1446 return self._state not in (COMMITTED, CLOSED) 

1447 

1448 

1449class _SessionCloseState(Enum): 

1450 ACTIVE = 1 

1451 CLOSED = 2 

1452 CLOSE_IS_RESET = 3 

1453 

1454 

1455class Session(_SessionClassMethods, EventTarget): 

1456 """Manages persistence operations for ORM-mapped objects. 

1457 

1458 The :class:`_orm.Session` is **not safe for use in concurrent threads.**. 

1459 See :ref:`session_faq_threadsafe` for background. 

1460 

1461 The Session's usage paradigm is described at :doc:`/orm/session`. 

1462 

1463 

1464 """ 

1465 

1466 _is_asyncio = False 

1467 

1468 dispatch: dispatcher[Session] 

1469 

1470 identity_map: IdentityMap 

1471 """A mapping of object identities to objects themselves. 

1472 

1473 Iterating through ``Session.identity_map.values()`` provides 

1474 access to the full set of persistent objects (i.e., those 

1475 that have row identity) currently in the session. 

1476 

1477 .. seealso:: 

1478 

1479 :func:`.identity_key` - helper function to produce the keys used 

1480 in this dictionary. 

1481 

1482 """ 

1483 

1484 _new: Dict[InstanceState[Any], Any] 

1485 _deleted: Dict[InstanceState[Any], Any] 

1486 bind: Optional[Union[Engine, Connection]] 

1487 __binds: Dict[_SessionBindKey, _SessionBind] 

1488 _flushing: bool 

1489 _warn_on_events: bool 

1490 _transaction: Optional[SessionTransaction] 

1491 _nested_transaction: Optional[SessionTransaction] 

1492 hash_key: int 

1493 autoflush: bool 

1494 expire_on_commit: bool 

1495 enable_baked_queries: bool 

1496 twophase: bool 

1497 join_transaction_mode: JoinTransactionMode 

1498 _query_cls: Type[Query[Any]] 

1499 _close_state: _SessionCloseState 

1500 

1501 def __init__( 

1502 self, 

1503 bind: Optional[_SessionBind] = None, 

1504 *, 

1505 autoflush: bool = True, 

1506 future: Literal[True] = True, 

1507 expire_on_commit: bool = True, 

1508 autobegin: bool = True, 

1509 twophase: bool = False, 

1510 binds: Optional[Dict[_SessionBindKey, _SessionBind]] = None, 

1511 enable_baked_queries: bool = True, 

1512 info: Optional[_InfoType] = None, 

1513 query_cls: Optional[Type[Query[Any]]] = None, 

1514 autocommit: Literal[False] = False, 

1515 join_transaction_mode: JoinTransactionMode = "conditional_savepoint", 

1516 close_resets_only: Union[bool, _NoArg] = _NoArg.NO_ARG, 

1517 ): 

1518 r"""Construct a new :class:`_orm.Session`. 

1519 

1520 See also the :class:`.sessionmaker` function which is used to 

1521 generate a :class:`.Session`-producing callable with a given 

1522 set of arguments. 

1523 

1524 :param autoflush: When ``True``, all query operations will issue a 

1525 :meth:`~.Session.flush` call to this ``Session`` before proceeding. 

1526 This is a convenience feature so that :meth:`~.Session.flush` need 

1527 not be called repeatedly in order for database queries to retrieve 

1528 results. 

1529 

1530 .. seealso:: 

1531 

1532 :ref:`session_flushing` - additional background on autoflush 

1533 

1534 :param autobegin: Automatically start transactions (i.e. equivalent to 

1535 invoking :meth:`_orm.Session.begin`) when database access is 

1536 requested by an operation. Defaults to ``True``. Set to 

1537 ``False`` to prevent a :class:`_orm.Session` from implicitly 

1538 beginning transactions after construction, as well as after any of 

1539 the :meth:`_orm.Session.rollback`, :meth:`_orm.Session.commit`, 

1540 or :meth:`_orm.Session.close` methods are called. 

1541 

1542 .. versionadded:: 2.0 

1543 

1544 .. seealso:: 

1545 

1546 :ref:`session_autobegin_disable` 

1547 

1548 :param bind: An optional :class:`_engine.Engine` or 

1549 :class:`_engine.Connection` to 

1550 which this ``Session`` should be bound. When specified, all SQL 

1551 operations performed by this session will execute via this 

1552 connectable. 

1553 

1554 :param binds: A dictionary which may specify any number of 

1555 :class:`_engine.Engine` or :class:`_engine.Connection` 

1556 objects as the source of 

1557 connectivity for SQL operations on a per-entity basis. The keys 

1558 of the dictionary consist of any series of mapped classes, 

1559 arbitrary Python classes that are bases for mapped classes, 

1560 :class:`_schema.Table` objects and :class:`_orm.Mapper` objects. 

1561 The 

1562 values of the dictionary are then instances of 

1563 :class:`_engine.Engine` 

1564 or less commonly :class:`_engine.Connection` objects. 

1565 Operations which 

1566 proceed relative to a particular mapped class will consult this 

1567 dictionary for the closest matching entity in order to determine 

1568 which :class:`_engine.Engine` should be used for a particular SQL 

1569 operation. The complete heuristics for resolution are 

1570 described at :meth:`.Session.get_bind`. Usage looks like:: 

1571 

1572 Session = sessionmaker( 

1573 binds={ 

1574 SomeMappedClass: create_engine("postgresql+psycopg2://engine1"), 

1575 SomeDeclarativeBase: create_engine( 

1576 "postgresql+psycopg2://engine2" 

1577 ), 

1578 some_mapper: create_engine("postgresql+psycopg2://engine3"), 

1579 some_table: create_engine("postgresql+psycopg2://engine4"), 

1580 } 

1581 ) 

1582 

1583 .. seealso:: 

1584 

1585 :ref:`session_partitioning` 

1586 

1587 :meth:`.Session.bind_mapper` 

1588 

1589 :meth:`.Session.bind_table` 

1590 

1591 :meth:`.Session.get_bind` 

1592 

1593 

1594 :param \class_: Specify an alternate class other than 

1595 ``sqlalchemy.orm.session.Session`` which should be used by the 

1596 returned class. This is the only argument that is local to the 

1597 :class:`.sessionmaker` function, and is not sent directly to the 

1598 constructor for ``Session``. 

1599 

1600 :param enable_baked_queries: legacy; defaults to ``True``. 

1601 A parameter consumed 

1602 by the :mod:`sqlalchemy.ext.baked` extension to determine if 

1603 "baked queries" should be cached, as is the normal operation 

1604 of this extension. When set to ``False``, caching as used by 

1605 this particular extension is disabled. 

1606 

1607 .. versionchanged:: 1.4 The ``sqlalchemy.ext.baked`` extension is 

1608 legacy and is not used by any of SQLAlchemy's internals. This 

1609 flag therefore only affects applications that are making explicit 

1610 use of this extension within their own code. 

1611 

1612 :param expire_on_commit: Defaults to ``True``. When ``True``, all 

1613 instances will be fully expired after each :meth:`~.commit`, 

1614 so that all attribute/object access subsequent to a completed 

1615 transaction will load from the most recent database state. 

1616 

1617 .. seealso:: 

1618 

1619 :ref:`session_committing` 

1620 

1621 :param future: Deprecated; this flag is always True. 

1622 

1623 .. seealso:: 

1624 

1625 :ref:`migration_20_toplevel` 

1626 

1627 :param info: optional dictionary of arbitrary data to be associated 

1628 with this :class:`.Session`. Is available via the 

1629 :attr:`.Session.info` attribute. Note the dictionary is copied at 

1630 construction time so that modifications to the per- 

1631 :class:`.Session` dictionary will be local to that 

1632 :class:`.Session`. 

1633 

1634 :param query_cls: Class which should be used to create new Query 

1635 objects, as returned by the :meth:`~.Session.query` method. 

1636 Defaults to :class:`_query.Query`. 

1637 

1638 :param twophase: When ``True``, all transactions will be started as 

1639 a "two phase" transaction, i.e. using the "two phase" semantics 

1640 of the database in use along with an XID. During a 

1641 :meth:`~.commit`, after :meth:`~.flush` has been issued for all 

1642 attached databases, the :meth:`~.TwoPhaseTransaction.prepare` 

1643 method on each database's :class:`.TwoPhaseTransaction` will be 

1644 called. This allows each database to roll back the entire 

1645 transaction, before each transaction is committed. 

1646 

1647 :param autocommit: the "autocommit" keyword is present for backwards 

1648 compatibility but must remain at its default value of ``False``. 

1649 

1650 :param join_transaction_mode: Describes the transactional behavior to 

1651 take when a given bind is a :class:`_engine.Connection` that 

1652 has already begun a transaction outside the scope of this 

1653 :class:`_orm.Session`; in other words the 

1654 :meth:`_engine.Connection.in_transaction()` method returns True. 

1655 

1656 The following behaviors only take effect when the :class:`_orm.Session` 

1657 **actually makes use of the connection given**; that is, a method 

1658 such as :meth:`_orm.Session.execute`, :meth:`_orm.Session.connection`, 

1659 etc. are actually invoked: 

1660 

1661 * ``"conditional_savepoint"`` - this is the default. if the given 

1662 :class:`_engine.Connection` is begun within a transaction but 

1663 does not have a SAVEPOINT, then ``"rollback_only"`` is used. 

1664 If the :class:`_engine.Connection` is additionally within 

1665 a SAVEPOINT, in other words 

1666 :meth:`_engine.Connection.in_nested_transaction()` method returns 

1667 True, then ``"create_savepoint"`` is used. 

1668 

1669 ``"conditional_savepoint"`` behavior attempts to make use of 

1670 savepoints in order to keep the state of the existing transaction 

1671 unchanged, but only if there is already a savepoint in progress; 

1672 otherwise, it is not assumed that the backend in use has adequate 

1673 support for SAVEPOINT, as availability of this feature varies. 

1674 ``"conditional_savepoint"`` also seeks to establish approximate 

1675 backwards compatibility with previous :class:`_orm.Session` 

1676 behavior, for applications that are not setting a specific mode. It 

1677 is recommended that one of the explicit settings be used. 

1678 

1679 * ``"create_savepoint"`` - the :class:`_orm.Session` will use 

1680 :meth:`_engine.Connection.begin_nested()` in all cases to create 

1681 its own transaction. This transaction by its nature rides 

1682 "on top" of any existing transaction that's opened on the given 

1683 :class:`_engine.Connection`; if the underlying database and 

1684 the driver in use has full, non-broken support for SAVEPOINT, the 

1685 external transaction will remain unaffected throughout the 

1686 lifespan of the :class:`_orm.Session`. 

1687 

1688 The ``"create_savepoint"`` mode is the most useful for integrating 

1689 a :class:`_orm.Session` into a test suite where an externally 

1690 initiated transaction should remain unaffected; however, it relies 

1691 on proper SAVEPOINT support from the underlying driver and 

1692 database. 

1693 

1694 .. tip:: When using SQLite, the SQLite driver included through 

1695 Python 3.11 does not handle SAVEPOINTs correctly in all cases 

1696 without workarounds. See the sections 

1697 :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable` 

1698 for details on current workarounds. 

1699 

1700 * ``"control_fully"`` - the :class:`_orm.Session` will take 

1701 control of the given transaction as its own; 

1702 :meth:`_orm.Session.commit` will call ``.commit()`` on the 

1703 transaction, :meth:`_orm.Session.rollback` will call 

1704 ``.rollback()`` on the transaction, :meth:`_orm.Session.close` will 

1705 call ``.rollback`` on the transaction. 

1706 

1707 .. tip:: This mode of use is equivalent to how SQLAlchemy 1.4 would 

1708 handle a :class:`_engine.Connection` given with an existing 

1709 SAVEPOINT (i.e. :meth:`_engine.Connection.begin_nested`); the 

1710 :class:`_orm.Session` would take full control of the existing 

1711 SAVEPOINT. 

1712 

1713 * ``"rollback_only"`` - the :class:`_orm.Session` will take control 

1714 of the given transaction for ``.rollback()`` calls only; 

1715 ``.commit()`` calls will not be propagated to the given 

1716 transaction. ``.close()`` calls will have no effect on the 

1717 given transaction. 

1718 

1719 .. tip:: This mode of use is equivalent to how SQLAlchemy 1.4 would 

1720 handle a :class:`_engine.Connection` given with an existing 

1721 regular database transaction (i.e. 

1722 :meth:`_engine.Connection.begin`); the :class:`_orm.Session` 

1723 would propagate :meth:`_orm.Session.rollback` calls to the 

1724 underlying transaction, but not :meth:`_orm.Session.commit` or 

1725 :meth:`_orm.Session.close` calls. 

1726 

1727 .. versionadded:: 2.0.0rc1 

1728 

1729 :param close_resets_only: Defaults to ``True``. Determines if 

1730 the session should reset itself after calling ``.close()`` 

1731 or should pass in a no longer usable state, disabling re-use. 

1732 

1733 .. versionadded:: 2.0.22 added flag ``close_resets_only``. 

1734 A future SQLAlchemy version may change the default value of 

1735 this flag to ``False``. 

1736 

1737 .. seealso:: 

1738 

1739 :ref:`session_closing` - Detail on the semantics of 

1740 :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. 

1741 

1742 """ # noqa 

1743 

1744 # considering allowing the "autocommit" keyword to still be accepted 

1745 # as long as it's False, so that external test suites, oslo.db etc 

1746 # continue to function as the argument appears to be passed in lots 

1747 # of cases including in our own test suite 

1748 if autocommit: 

1749 raise sa_exc.ArgumentError( 

1750 "autocommit=True is no longer supported" 

1751 ) 

1752 self.identity_map = identity.WeakInstanceDict() 

1753 

1754 if not future: 

1755 raise sa_exc.ArgumentError( 

1756 "The 'future' parameter passed to " 

1757 "Session() may only be set to True." 

1758 ) 

1759 

1760 self._new = {} # InstanceState->object, strong refs object 

1761 self._deleted = {} # same 

1762 self.bind = bind 

1763 self.__binds = {} 

1764 self._flushing = False 

1765 self._warn_on_events = False 

1766 self._transaction = None 

1767 self._nested_transaction = None 

1768 self.hash_key = _new_sessionid() 

1769 self.autobegin = autobegin 

1770 self.autoflush = autoflush 

1771 self.expire_on_commit = expire_on_commit 

1772 self.enable_baked_queries = enable_baked_queries 

1773 

1774 # the idea is that at some point NO_ARG will warn that in the future 

1775 # the default will switch to close_resets_only=False. 

1776 if close_resets_only in (True, _NoArg.NO_ARG): 

1777 self._close_state = _SessionCloseState.CLOSE_IS_RESET 

1778 else: 

1779 self._close_state = _SessionCloseState.ACTIVE 

1780 if ( 

1781 join_transaction_mode 

1782 and join_transaction_mode 

1783 not in JoinTransactionMode.__args__ # type: ignore 

1784 ): 

1785 raise sa_exc.ArgumentError( 

1786 f"invalid selection for join_transaction_mode: " 

1787 f'"{join_transaction_mode}"' 

1788 ) 

1789 self.join_transaction_mode = join_transaction_mode 

1790 

1791 self.twophase = twophase 

1792 self._query_cls = query_cls if query_cls else query.Query 

1793 if info: 

1794 self.info.update(info) 

1795 

1796 if binds is not None: 

1797 for key, bind in binds.items(): 

1798 self._add_bind(key, bind) 

1799 

1800 _sessions[self.hash_key] = self 

1801 

1802 # used by sqlalchemy.engine.util.TransactionalContext 

1803 _trans_context_manager: Optional[TransactionalContext] = None 

1804 

1805 connection_callable: Optional[_ConnectionCallableProto] = None 

1806 

1807 def __enter__(self: _S) -> _S: 

1808 return self 

1809 

1810 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

1811 self.close() 

1812 

1813 @contextlib.contextmanager 

1814 def _maker_context_manager(self: _S) -> Iterator[_S]: 

1815 with self: 

1816 with self.begin(): 

1817 yield self 

1818 

1819 def in_transaction(self) -> bool: 

1820 """Return True if this :class:`_orm.Session` has begun a transaction. 

1821 

1822 .. versionadded:: 1.4 

1823 

1824 .. seealso:: 

1825 

1826 :attr:`_orm.Session.is_active` 

1827 

1828 

1829 """ 

1830 return self._transaction is not None 

1831 

1832 def in_nested_transaction(self) -> bool: 

1833 """Return True if this :class:`_orm.Session` has begun a nested 

1834 transaction, e.g. SAVEPOINT. 

1835 

1836 .. versionadded:: 1.4 

1837 

1838 """ 

1839 return self._nested_transaction is not None 

1840 

1841 def get_transaction(self) -> Optional[SessionTransaction]: 

1842 """Return the current root transaction in progress, if any. 

1843 

1844 .. versionadded:: 1.4 

1845 

1846 """ 

1847 trans = self._transaction 

1848 while trans is not None and trans._parent is not None: 

1849 trans = trans._parent 

1850 return trans 

1851 

1852 def get_nested_transaction(self) -> Optional[SessionTransaction]: 

1853 """Return the current nested transaction in progress, if any. 

1854 

1855 .. versionadded:: 1.4 

1856 

1857 """ 

1858 

1859 return self._nested_transaction 

1860 

1861 @util.memoized_property 

1862 def info(self) -> _InfoType: 

1863 """A user-modifiable dictionary. 

1864 

1865 The initial value of this dictionary can be populated using the 

1866 ``info`` argument to the :class:`.Session` constructor or 

1867 :class:`.sessionmaker` constructor or factory methods. The dictionary 

1868 here is always local to this :class:`.Session` and can be modified 

1869 independently of all other :class:`.Session` objects. 

1870 

1871 """ 

1872 return {} 

1873 

1874 def _autobegin_t(self, begin: bool = False) -> SessionTransaction: 

1875 if self._transaction is None: 

1876 if not begin and not self.autobegin: 

1877 raise sa_exc.InvalidRequestError( 

1878 "Autobegin is disabled on this Session; please call " 

1879 "session.begin() to start a new transaction" 

1880 ) 

1881 trans = SessionTransaction( 

1882 self, 

1883 ( 

1884 SessionTransactionOrigin.BEGIN 

1885 if begin 

1886 else SessionTransactionOrigin.AUTOBEGIN 

1887 ), 

1888 ) 

1889 assert self._transaction is trans 

1890 return trans 

1891 

1892 return self._transaction 

1893 

1894 def begin(self, nested: bool = False) -> SessionTransaction: 

1895 """Begin a transaction, or nested transaction, 

1896 on this :class:`.Session`, if one is not already begun. 

1897 

1898 The :class:`_orm.Session` object features **autobegin** behavior, 

1899 so that normally it is not necessary to call the 

1900 :meth:`_orm.Session.begin` 

1901 method explicitly. However, it may be used in order to control 

1902 the scope of when the transactional state is begun. 

1903 

1904 When used to begin the outermost transaction, an error is raised 

1905 if this :class:`.Session` is already inside of a transaction. 

1906 

1907 :param nested: if True, begins a SAVEPOINT transaction and is 

1908 equivalent to calling :meth:`~.Session.begin_nested`. For 

1909 documentation on SAVEPOINT transactions, please see 

1910 :ref:`session_begin_nested`. 

1911 

1912 :return: the :class:`.SessionTransaction` object. Note that 

1913 :class:`.SessionTransaction` 

1914 acts as a Python context manager, allowing :meth:`.Session.begin` 

1915 to be used in a "with" block. See :ref:`session_explicit_begin` for 

1916 an example. 

1917 

1918 .. seealso:: 

1919 

1920 :ref:`session_autobegin` 

1921 

1922 :ref:`unitofwork_transaction` 

1923 

1924 :meth:`.Session.begin_nested` 

1925 

1926 

1927 """ 

1928 

1929 trans = self._transaction 

1930 if trans is None: 

1931 trans = self._autobegin_t(begin=True) 

1932 

1933 if not nested: 

1934 return trans 

1935 

1936 assert trans is not None 

1937 

1938 if nested: 

1939 trans = trans._begin(nested=nested) 

1940 assert self._transaction is trans 

1941 self._nested_transaction = trans 

1942 else: 

1943 raise sa_exc.InvalidRequestError( 

1944 "A transaction is already begun on this Session." 

1945 ) 

1946 

1947 return trans # needed for __enter__/__exit__ hook 

1948 

1949 def begin_nested(self) -> SessionTransaction: 

1950 """Begin a "nested" transaction on this Session, e.g. SAVEPOINT. 

1951 

1952 The target database(s) and associated drivers must support SQL 

1953 SAVEPOINT for this method to function correctly. 

1954 

1955 For documentation on SAVEPOINT 

1956 transactions, please see :ref:`session_begin_nested`. 

1957 

1958 :return: the :class:`.SessionTransaction` object. Note that 

1959 :class:`.SessionTransaction` acts as a context manager, allowing 

1960 :meth:`.Session.begin_nested` to be used in a "with" block. 

1961 See :ref:`session_begin_nested` for a usage example. 

1962 

1963 .. seealso:: 

1964 

1965 :ref:`session_begin_nested` 

1966 

1967 :ref:`pysqlite_serializable` - special workarounds required 

1968 with the SQLite driver in order for SAVEPOINT to work 

1969 correctly. For asyncio use cases, see the section 

1970 :ref:`aiosqlite_serializable`. 

1971 

1972 """ 

1973 return self.begin(nested=True) 

1974 

1975 def rollback(self) -> None: 

1976 """Rollback the current transaction in progress. 

1977 

1978 If no transaction is in progress, this method is a pass-through. 

1979 

1980 The method always rolls back 

1981 the topmost database transaction, discarding any nested 

1982 transactions that may be in progress. 

1983 

1984 .. seealso:: 

1985 

1986 :ref:`session_rollback` 

1987 

1988 :ref:`unitofwork_transaction` 

1989 

1990 """ 

1991 if self._transaction is None: 

1992 pass 

1993 else: 

1994 self._transaction.rollback(_to_root=True) 

1995 

1996 def commit(self) -> None: 

1997 """Flush pending changes and commit the current transaction. 

1998 

1999 When the COMMIT operation is complete, all objects are fully 

2000 :term:`expired`, erasing their internal contents, which will be 

2001 automatically re-loaded when the objects are next accessed. In the 

2002 interim, these objects are in an expired state and will not function if 

2003 they are :term:`detached` from the :class:`.Session`. Additionally, 

2004 this re-load operation is not supported when using asyncio-oriented 

2005 APIs. The :paramref:`.Session.expire_on_commit` parameter may be used 

2006 to disable this behavior. 

2007 

2008 When there is no transaction in place for the :class:`.Session`, 

2009 indicating that no operations were invoked on this :class:`.Session` 

2010 since the previous call to :meth:`.Session.commit`, the method will 

2011 begin and commit an internal-only "logical" transaction, that does not 

2012 normally affect the database unless pending flush changes were 

2013 detected, but will still invoke event handlers and object expiration 

2014 rules. 

2015 

2016 The outermost database transaction is committed unconditionally, 

2017 automatically releasing any SAVEPOINTs in effect. 

2018 

2019 .. seealso:: 

2020 

2021 :ref:`session_committing` 

2022 

2023 :ref:`unitofwork_transaction` 

2024 

2025 :ref:`asyncio_orm_avoid_lazyloads` 

2026 

2027 """ 

2028 trans = self._transaction 

2029 if trans is None: 

2030 trans = self._autobegin_t() 

2031 

2032 trans.commit(_to_root=True) 

2033 

2034 def prepare(self) -> None: 

2035 """Prepare the current transaction in progress for two phase commit. 

2036 

2037 If no transaction is in progress, this method raises an 

2038 :exc:`~sqlalchemy.exc.InvalidRequestError`. 

2039 

2040 Only root transactions of two phase sessions can be prepared. If the 

2041 current transaction is not such, an 

2042 :exc:`~sqlalchemy.exc.InvalidRequestError` is raised. 

2043 

2044 """ 

2045 trans = self._transaction 

2046 if trans is None: 

2047 trans = self._autobegin_t() 

2048 

2049 trans.prepare() 

2050 

2051 def connection( 

2052 self, 

2053 bind_arguments: Optional[_BindArguments] = None, 

2054 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

2055 ) -> Connection: 

2056 r"""Return a :class:`_engine.Connection` object corresponding to this 

2057 :class:`.Session` object's transactional state. 

2058 

2059 Either the :class:`_engine.Connection` corresponding to the current 

2060 transaction is returned, or if no transaction is in progress, a new 

2061 one is begun and the :class:`_engine.Connection` 

2062 returned (note that no 

2063 transactional state is established with the DBAPI until the first 

2064 SQL statement is emitted). 

2065 

2066 Ambiguity in multi-bind or unbound :class:`.Session` objects can be 

2067 resolved through any of the optional keyword arguments. This 

2068 ultimately makes usage of the :meth:`.get_bind` method for resolution. 

2069 

2070 :param bind_arguments: dictionary of bind arguments. May include 

2071 "mapper", "bind", "clause", other custom arguments that are passed 

2072 to :meth:`.Session.get_bind`. 

2073 

2074 :param execution_options: a dictionary of execution options that will 

2075 be passed to :meth:`_engine.Connection.execution_options`, **when the 

2076 connection is first procured only**. If the connection is already 

2077 present within the :class:`.Session`, a warning is emitted and 

2078 the arguments are ignored. 

2079 

2080 .. seealso:: 

2081 

2082 :ref:`session_transaction_isolation` 

2083 

2084 """ 

2085 

2086 if bind_arguments: 

2087 bind = bind_arguments.pop("bind", None) 

2088 

2089 if bind is None: 

2090 bind = self.get_bind(**bind_arguments) 

2091 else: 

2092 bind = self.get_bind() 

2093 

2094 return self._connection_for_bind( 

2095 bind, 

2096 execution_options=execution_options, 

2097 ) 

2098 

2099 def _connection_for_bind( 

2100 self, 

2101 engine: _SessionBind, 

2102 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

2103 **kw: Any, 

2104 ) -> Connection: 

2105 TransactionalContext._trans_ctx_check(self) 

2106 

2107 trans = self._transaction 

2108 if trans is None: 

2109 trans = self._autobegin_t() 

2110 return trans._connection_for_bind(engine, execution_options) 

2111 

2112 @overload 

2113 def _execute_internal( 

2114 self, 

2115 statement: Executable, 

2116 params: Optional[_CoreSingleExecuteParams] = None, 

2117 *, 

2118 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2119 bind_arguments: Optional[_BindArguments] = None, 

2120 _parent_execute_state: Optional[Any] = None, 

2121 _add_event: Optional[Any] = None, 

2122 _scalar_result: Literal[True] = ..., 

2123 ) -> Any: ... 

2124 

2125 @overload 

2126 def _execute_internal( 

2127 self, 

2128 statement: Executable, 

2129 params: Optional[_CoreAnyExecuteParams] = None, 

2130 *, 

2131 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2132 bind_arguments: Optional[_BindArguments] = None, 

2133 _parent_execute_state: Optional[Any] = None, 

2134 _add_event: Optional[Any] = None, 

2135 _scalar_result: bool = ..., 

2136 ) -> Result[Any]: ... 

2137 

2138 def _execute_internal( 

2139 self, 

2140 statement: Executable, 

2141 params: Optional[_CoreAnyExecuteParams] = None, 

2142 *, 

2143 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2144 bind_arguments: Optional[_BindArguments] = None, 

2145 _parent_execute_state: Optional[Any] = None, 

2146 _add_event: Optional[Any] = None, 

2147 _scalar_result: bool = False, 

2148 ) -> Any: 

2149 statement = coercions.expect(roles.StatementRole, statement) 

2150 

2151 if not bind_arguments: 

2152 bind_arguments = {} 

2153 else: 

2154 bind_arguments = dict(bind_arguments) 

2155 

2156 if ( 

2157 statement._propagate_attrs.get("compile_state_plugin", None) 

2158 == "orm" 

2159 ): 

2160 compile_state_cls = CompileState._get_plugin_class_for_plugin( 

2161 statement, "orm" 

2162 ) 

2163 if TYPE_CHECKING: 

2164 assert isinstance( 

2165 compile_state_cls, context.AbstractORMCompileState 

2166 ) 

2167 else: 

2168 compile_state_cls = None 

2169 bind_arguments.setdefault("clause", statement) 

2170 

2171 execution_options = util.coerce_to_immutabledict(execution_options) 

2172 

2173 if _parent_execute_state: 

2174 events_todo = _parent_execute_state._remaining_events() 

2175 else: 

2176 events_todo = self.dispatch.do_orm_execute 

2177 if _add_event: 

2178 events_todo = list(events_todo) + [_add_event] 

2179 

2180 if events_todo: 

2181 if compile_state_cls is not None: 

2182 # for event handlers, do the orm_pre_session_exec 

2183 # pass ahead of the event handlers, so that things like 

2184 # .load_options, .update_delete_options etc. are populated. 

2185 # is_pre_event=True allows the hook to hold off on things 

2186 # it doesn't want to do twice, including autoflush as well 

2187 # as "pre fetch" for DML, etc. 

2188 ( 

2189 statement, 

2190 execution_options, 

2191 ) = compile_state_cls.orm_pre_session_exec( 

2192 self, 

2193 statement, 

2194 params, 

2195 execution_options, 

2196 bind_arguments, 

2197 True, 

2198 ) 

2199 

2200 orm_exec_state = ORMExecuteState( 

2201 self, 

2202 statement, 

2203 params, 

2204 execution_options, 

2205 bind_arguments, 

2206 compile_state_cls, 

2207 events_todo, 

2208 ) 

2209 for idx, fn in enumerate(events_todo): 

2210 orm_exec_state._starting_event_idx = idx 

2211 fn_result: Optional[Result[Any]] = fn(orm_exec_state) 

2212 if fn_result: 

2213 if _scalar_result: 

2214 return fn_result.scalar() 

2215 else: 

2216 return fn_result 

2217 

2218 statement = orm_exec_state.statement 

2219 execution_options = orm_exec_state.local_execution_options 

2220 

2221 if compile_state_cls is not None: 

2222 # now run orm_pre_session_exec() "for real". if there were 

2223 # event hooks, this will re-run the steps that interpret 

2224 # new execution_options into load_options / update_delete_options, 

2225 # which we assume the event hook might have updated. 

2226 # autoflush will also be invoked in this step if enabled. 

2227 ( 

2228 statement, 

2229 execution_options, 

2230 ) = compile_state_cls.orm_pre_session_exec( 

2231 self, 

2232 statement, 

2233 params, 

2234 execution_options, 

2235 bind_arguments, 

2236 False, 

2237 ) 

2238 

2239 bind = self.get_bind(**bind_arguments) 

2240 

2241 conn = self._connection_for_bind(bind) 

2242 

2243 if _scalar_result and not compile_state_cls: 

2244 if TYPE_CHECKING: 

2245 params = cast(_CoreSingleExecuteParams, params) 

2246 return conn.scalar( 

2247 statement, params or {}, execution_options=execution_options 

2248 ) 

2249 

2250 if compile_state_cls: 

2251 result: Result[Any] = compile_state_cls.orm_execute_statement( 

2252 self, 

2253 statement, 

2254 params or {}, 

2255 execution_options, 

2256 bind_arguments, 

2257 conn, 

2258 ) 

2259 else: 

2260 result = conn.execute( 

2261 statement, params or {}, execution_options=execution_options 

2262 ) 

2263 

2264 if _scalar_result: 

2265 return result.scalar() 

2266 else: 

2267 return result 

2268 

2269 @overload 

2270 def execute( 

2271 self, 

2272 statement: TypedReturnsRows[_T], 

2273 params: Optional[_CoreAnyExecuteParams] = None, 

2274 *, 

2275 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2276 bind_arguments: Optional[_BindArguments] = None, 

2277 _parent_execute_state: Optional[Any] = None, 

2278 _add_event: Optional[Any] = None, 

2279 ) -> Result[_T]: ... 

2280 

2281 @overload 

2282 def execute( 

2283 self, 

2284 statement: UpdateBase, 

2285 params: Optional[_CoreAnyExecuteParams] = None, 

2286 *, 

2287 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2288 bind_arguments: Optional[_BindArguments] = None, 

2289 _parent_execute_state: Optional[Any] = None, 

2290 _add_event: Optional[Any] = None, 

2291 ) -> CursorResult[Any]: ... 

2292 

2293 @overload 

2294 def execute( 

2295 self, 

2296 statement: Executable, 

2297 params: Optional[_CoreAnyExecuteParams] = None, 

2298 *, 

2299 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2300 bind_arguments: Optional[_BindArguments] = None, 

2301 _parent_execute_state: Optional[Any] = None, 

2302 _add_event: Optional[Any] = None, 

2303 ) -> Result[Any]: ... 

2304 

2305 def execute( 

2306 self, 

2307 statement: Executable, 

2308 params: Optional[_CoreAnyExecuteParams] = None, 

2309 *, 

2310 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2311 bind_arguments: Optional[_BindArguments] = None, 

2312 _parent_execute_state: Optional[Any] = None, 

2313 _add_event: Optional[Any] = None, 

2314 ) -> Result[Any]: 

2315 r"""Execute a SQL expression construct. 

2316 

2317 Returns a :class:`_engine.Result` object representing 

2318 results of the statement execution. 

2319 

2320 E.g.:: 

2321 

2322 from sqlalchemy import select 

2323 

2324 result = session.execute(select(User).where(User.id == 5)) 

2325 

2326 The API contract of :meth:`_orm.Session.execute` is similar to that 

2327 of :meth:`_engine.Connection.execute`, the :term:`2.0 style` version 

2328 of :class:`_engine.Connection`. 

2329 

2330 .. versionchanged:: 1.4 the :meth:`_orm.Session.execute` method is 

2331 now the primary point of ORM statement execution when using 

2332 :term:`2.0 style` ORM usage. 

2333 

2334 :param statement: 

2335 An executable statement (i.e. an :class:`.Executable` expression 

2336 such as :func:`_expression.select`). 

2337 

2338 :param params: 

2339 Optional dictionary, or list of dictionaries, containing 

2340 bound parameter values. If a single dictionary, single-row 

2341 execution occurs; if a list of dictionaries, an 

2342 "executemany" will be invoked. The keys in each dictionary 

2343 must correspond to parameter names present in the statement. 

2344 

2345 :param execution_options: optional dictionary of execution options, 

2346 which will be associated with the statement execution. This 

2347 dictionary can provide a subset of the options that are accepted 

2348 by :meth:`_engine.Connection.execution_options`, and may also 

2349 provide additional options understood only in an ORM context. 

2350 

2351 .. seealso:: 

2352 

2353 :ref:`orm_queryguide_execution_options` - ORM-specific execution 

2354 options 

2355 

2356 :param bind_arguments: dictionary of additional arguments to determine 

2357 the bind. May include "mapper", "bind", or other custom arguments. 

2358 Contents of this dictionary are passed to the 

2359 :meth:`.Session.get_bind` method. 

2360 

2361 :return: a :class:`_engine.Result` object. 

2362 

2363 

2364 """ 

2365 return self._execute_internal( 

2366 statement, 

2367 params, 

2368 execution_options=execution_options, 

2369 bind_arguments=bind_arguments, 

2370 _parent_execute_state=_parent_execute_state, 

2371 _add_event=_add_event, 

2372 ) 

2373 

2374 @overload 

2375 def scalar( 

2376 self, 

2377 statement: TypedReturnsRows[Tuple[_T]], 

2378 params: Optional[_CoreSingleExecuteParams] = None, 

2379 *, 

2380 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2381 bind_arguments: Optional[_BindArguments] = None, 

2382 **kw: Any, 

2383 ) -> Optional[_T]: ... 

2384 

2385 @overload 

2386 def scalar( 

2387 self, 

2388 statement: Executable, 

2389 params: Optional[_CoreSingleExecuteParams] = None, 

2390 *, 

2391 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2392 bind_arguments: Optional[_BindArguments] = None, 

2393 **kw: Any, 

2394 ) -> Any: ... 

2395 

2396 def scalar( 

2397 self, 

2398 statement: Executable, 

2399 params: Optional[_CoreSingleExecuteParams] = None, 

2400 *, 

2401 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2402 bind_arguments: Optional[_BindArguments] = None, 

2403 **kw: Any, 

2404 ) -> Any: 

2405 """Execute a statement and return a scalar result. 

2406 

2407 Usage and parameters are the same as that of 

2408 :meth:`_orm.Session.execute`; the return result is a scalar Python 

2409 value. 

2410 

2411 """ 

2412 

2413 return self._execute_internal( 

2414 statement, 

2415 params, 

2416 execution_options=execution_options, 

2417 bind_arguments=bind_arguments, 

2418 _scalar_result=True, 

2419 **kw, 

2420 ) 

2421 

2422 @overload 

2423 def scalars( 

2424 self, 

2425 statement: TypedReturnsRows[Tuple[_T]], 

2426 params: Optional[_CoreAnyExecuteParams] = None, 

2427 *, 

2428 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2429 bind_arguments: Optional[_BindArguments] = None, 

2430 **kw: Any, 

2431 ) -> ScalarResult[_T]: ... 

2432 

2433 @overload 

2434 def scalars( 

2435 self, 

2436 statement: Executable, 

2437 params: Optional[_CoreAnyExecuteParams] = None, 

2438 *, 

2439 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2440 bind_arguments: Optional[_BindArguments] = None, 

2441 **kw: Any, 

2442 ) -> ScalarResult[Any]: ... 

2443 

2444 def scalars( 

2445 self, 

2446 statement: Executable, 

2447 params: Optional[_CoreAnyExecuteParams] = None, 

2448 *, 

2449 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2450 bind_arguments: Optional[_BindArguments] = None, 

2451 **kw: Any, 

2452 ) -> ScalarResult[Any]: 

2453 """Execute a statement and return the results as scalars. 

2454 

2455 Usage and parameters are the same as that of 

2456 :meth:`_orm.Session.execute`; the return result is a 

2457 :class:`_result.ScalarResult` filtering object which 

2458 will return single elements rather than :class:`_row.Row` objects. 

2459 

2460 :return: a :class:`_result.ScalarResult` object 

2461 

2462 .. versionadded:: 1.4.24 Added :meth:`_orm.Session.scalars` 

2463 

2464 .. versionadded:: 1.4.26 Added :meth:`_orm.scoped_session.scalars` 

2465 

2466 .. seealso:: 

2467 

2468 :ref:`orm_queryguide_select_orm_entities` - contrasts the behavior 

2469 of :meth:`_orm.Session.execute` to :meth:`_orm.Session.scalars` 

2470 

2471 """ 

2472 

2473 return self._execute_internal( 

2474 statement, 

2475 params=params, 

2476 execution_options=execution_options, 

2477 bind_arguments=bind_arguments, 

2478 _scalar_result=False, # mypy appreciates this 

2479 **kw, 

2480 ).scalars() 

2481 

2482 def close(self) -> None: 

2483 """Close out the transactional resources and ORM objects used by this 

2484 :class:`_orm.Session`. 

2485 

2486 This expunges all ORM objects associated with this 

2487 :class:`_orm.Session`, ends any transaction in progress and 

2488 :term:`releases` any :class:`_engine.Connection` objects which this 

2489 :class:`_orm.Session` itself has checked out from associated 

2490 :class:`_engine.Engine` objects. The operation then leaves the 

2491 :class:`_orm.Session` in a state which it may be used again. 

2492 

2493 .. tip:: 

2494 

2495 In the default running mode the :meth:`_orm.Session.close` 

2496 method **does not prevent the Session from being used again**. 

2497 The :class:`_orm.Session` itself does not actually have a 

2498 distinct "closed" state; it merely means 

2499 the :class:`_orm.Session` will release all database connections 

2500 and ORM objects. 

2501 

2502 Setting the parameter :paramref:`_orm.Session.close_resets_only` 

2503 to ``False`` will instead make the ``close`` final, meaning that 

2504 any further action on the session will be forbidden. 

2505 

2506 .. versionchanged:: 1.4 The :meth:`.Session.close` method does not 

2507 immediately create a new :class:`.SessionTransaction` object; 

2508 instead, the new :class:`.SessionTransaction` is created only if 

2509 the :class:`.Session` is used again for a database operation. 

2510 

2511 .. seealso:: 

2512 

2513 :ref:`session_closing` - detail on the semantics of 

2514 :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. 

2515 

2516 :meth:`_orm.Session.reset` - a similar method that behaves like 

2517 ``close()`` with the parameter 

2518 :paramref:`_orm.Session.close_resets_only` set to ``True``. 

2519 

2520 """ 

2521 self._close_impl(invalidate=False) 

2522 

2523 def reset(self) -> None: 

2524 """Close out the transactional resources and ORM objects used by this 

2525 :class:`_orm.Session`, resetting the session to its initial state. 

2526 

2527 This method provides for same "reset-only" behavior that the 

2528 :meth:`_orm.Session.close` method has provided historically, where the 

2529 state of the :class:`_orm.Session` is reset as though the object were 

2530 brand new, and ready to be used again. 

2531 This method may then be useful for :class:`_orm.Session` objects 

2532 which set :paramref:`_orm.Session.close_resets_only` to ``False``, 

2533 so that "reset only" behavior is still available. 

2534 

2535 .. versionadded:: 2.0.22 

2536 

2537 .. seealso:: 

2538 

2539 :ref:`session_closing` - detail on the semantics of 

2540 :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. 

2541 

2542 :meth:`_orm.Session.close` - a similar method will additionally 

2543 prevent re-use of the Session when the parameter 

2544 :paramref:`_orm.Session.close_resets_only` is set to ``False``. 

2545 """ 

2546 self._close_impl(invalidate=False, is_reset=True) 

2547 

2548 def invalidate(self) -> None: 

2549 """Close this Session, using connection invalidation. 

2550 

2551 This is a variant of :meth:`.Session.close` that will additionally 

2552 ensure that the :meth:`_engine.Connection.invalidate` 

2553 method will be called on each :class:`_engine.Connection` object 

2554 that is currently in use for a transaction (typically there is only 

2555 one connection unless the :class:`_orm.Session` is used with 

2556 multiple engines). 

2557 

2558 This can be called when the database is known to be in a state where 

2559 the connections are no longer safe to be used. 

2560 

2561 Below illustrates a scenario when using `gevent 

2562 <https://www.gevent.org/>`_, which can produce ``Timeout`` exceptions 

2563 that may mean the underlying connection should be discarded:: 

2564 

2565 import gevent 

2566 

2567 try: 

2568 sess = Session() 

2569 sess.add(User()) 

2570 sess.commit() 

2571 except gevent.Timeout: 

2572 sess.invalidate() 

2573 raise 

2574 except: 

2575 sess.rollback() 

2576 raise 

2577 

2578 The method additionally does everything that :meth:`_orm.Session.close` 

2579 does, including that all ORM objects are expunged. 

2580 

2581 """ 

2582 self._close_impl(invalidate=True) 

2583 

2584 def _close_impl(self, invalidate: bool, is_reset: bool = False) -> None: 

2585 if not is_reset and self._close_state is _SessionCloseState.ACTIVE: 

2586 self._close_state = _SessionCloseState.CLOSED 

2587 self.expunge_all() 

2588 if self._transaction is not None: 

2589 for transaction in self._transaction._iterate_self_and_parents(): 

2590 transaction.close(invalidate) 

2591 

2592 def expunge_all(self) -> None: 

2593 """Remove all object instances from this ``Session``. 

2594 

2595 This is equivalent to calling ``expunge(obj)`` on all objects in this 

2596 ``Session``. 

2597 

2598 """ 

2599 

2600 all_states = self.identity_map.all_states() + list(self._new) 

2601 self.identity_map._kill() 

2602 self.identity_map = identity.WeakInstanceDict() 

2603 self._new = {} 

2604 self._deleted = {} 

2605 

2606 statelib.InstanceState._detach_states(all_states, self) 

2607 

2608 def _add_bind(self, key: _SessionBindKey, bind: _SessionBind) -> None: 

2609 try: 

2610 insp = inspect(key) 

2611 except sa_exc.NoInspectionAvailable as err: 

2612 if not isinstance(key, type): 

2613 raise sa_exc.ArgumentError( 

2614 "Not an acceptable bind target: %s" % key 

2615 ) from err 

2616 else: 

2617 self.__binds[key] = bind 

2618 else: 

2619 if TYPE_CHECKING: 

2620 assert isinstance(insp, Inspectable) 

2621 

2622 if isinstance(insp, TableClause): 

2623 self.__binds[insp] = bind 

2624 elif insp_is_mapper(insp): 

2625 self.__binds[insp.class_] = bind 

2626 for _selectable in insp._all_tables: 

2627 self.__binds[_selectable] = bind 

2628 else: 

2629 raise sa_exc.ArgumentError( 

2630 "Not an acceptable bind target: %s" % key 

2631 ) 

2632 

2633 def bind_mapper( 

2634 self, mapper: _EntityBindKey[_O], bind: _SessionBind 

2635 ) -> None: 

2636 """Associate a :class:`_orm.Mapper` or arbitrary Python class with a 

2637 "bind", e.g. an :class:`_engine.Engine` or 

2638 :class:`_engine.Connection`. 

2639 

2640 The given entity is added to a lookup used by the 

2641 :meth:`.Session.get_bind` method. 

2642 

2643 :param mapper: a :class:`_orm.Mapper` object, 

2644 or an instance of a mapped 

2645 class, or any Python class that is the base of a set of mapped 

2646 classes. 

2647 

2648 :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection` 

2649 object. 

2650 

2651 .. seealso:: 

2652 

2653 :ref:`session_partitioning` 

2654 

2655 :paramref:`.Session.binds` 

2656 

2657 :meth:`.Session.bind_table` 

2658 

2659 

2660 """ 

2661 self._add_bind(mapper, bind) 

2662 

2663 def bind_table(self, table: TableClause, bind: _SessionBind) -> None: 

2664 """Associate a :class:`_schema.Table` with a "bind", e.g. an 

2665 :class:`_engine.Engine` 

2666 or :class:`_engine.Connection`. 

2667 

2668 The given :class:`_schema.Table` is added to a lookup used by the 

2669 :meth:`.Session.get_bind` method. 

2670 

2671 :param table: a :class:`_schema.Table` object, 

2672 which is typically the target 

2673 of an ORM mapping, or is present within a selectable that is 

2674 mapped. 

2675 

2676 :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection` 

2677 object. 

2678 

2679 .. seealso:: 

2680 

2681 :ref:`session_partitioning` 

2682 

2683 :paramref:`.Session.binds` 

2684 

2685 :meth:`.Session.bind_mapper` 

2686 

2687 

2688 """ 

2689 self._add_bind(table, bind) 

2690 

2691 def get_bind( 

2692 self, 

2693 mapper: Optional[_EntityBindKey[_O]] = None, 

2694 *, 

2695 clause: Optional[ClauseElement] = None, 

2696 bind: Optional[_SessionBind] = None, 

2697 _sa_skip_events: Optional[bool] = None, 

2698 _sa_skip_for_implicit_returning: bool = False, 

2699 **kw: Any, 

2700 ) -> Union[Engine, Connection]: 

2701 """Return a "bind" to which this :class:`.Session` is bound. 

2702 

2703 The "bind" is usually an instance of :class:`_engine.Engine`, 

2704 except in the case where the :class:`.Session` has been 

2705 explicitly bound directly to a :class:`_engine.Connection`. 

2706 

2707 For a multiply-bound or unbound :class:`.Session`, the 

2708 ``mapper`` or ``clause`` arguments are used to determine the 

2709 appropriate bind to return. 

2710 

2711 Note that the "mapper" argument is usually present 

2712 when :meth:`.Session.get_bind` is called via an ORM 

2713 operation such as a :meth:`.Session.query`, each 

2714 individual INSERT/UPDATE/DELETE operation within a 

2715 :meth:`.Session.flush`, call, etc. 

2716 

2717 The order of resolution is: 

2718 

2719 1. if mapper given and :paramref:`.Session.binds` is present, 

2720 locate a bind based first on the mapper in use, then 

2721 on the mapped class in use, then on any base classes that are 

2722 present in the ``__mro__`` of the mapped class, from more specific 

2723 superclasses to more general. 

2724 2. if clause given and ``Session.binds`` is present, 

2725 locate a bind based on :class:`_schema.Table` objects 

2726 found in the given clause present in ``Session.binds``. 

2727 3. if ``Session.binds`` is present, return that. 

2728 4. if clause given, attempt to return a bind 

2729 linked to the :class:`_schema.MetaData` ultimately 

2730 associated with the clause. 

2731 5. if mapper given, attempt to return a bind 

2732 linked to the :class:`_schema.MetaData` ultimately 

2733 associated with the :class:`_schema.Table` or other 

2734 selectable to which the mapper is mapped. 

2735 6. No bind can be found, :exc:`~sqlalchemy.exc.UnboundExecutionError` 

2736 is raised. 

2737 

2738 Note that the :meth:`.Session.get_bind` method can be overridden on 

2739 a user-defined subclass of :class:`.Session` to provide any kind 

2740 of bind resolution scheme. See the example at 

2741 :ref:`session_custom_partitioning`. 

2742 

2743 :param mapper: 

2744 Optional mapped class or corresponding :class:`_orm.Mapper` instance. 

2745 The bind can be derived from a :class:`_orm.Mapper` first by 

2746 consulting the "binds" map associated with this :class:`.Session`, 

2747 and secondly by consulting the :class:`_schema.MetaData` associated 

2748 with the :class:`_schema.Table` to which the :class:`_orm.Mapper` is 

2749 mapped for a bind. 

2750 

2751 :param clause: 

2752 A :class:`_expression.ClauseElement` (i.e. 

2753 :func:`_expression.select`, 

2754 :func:`_expression.text`, 

2755 etc.). If the ``mapper`` argument is not present or could not 

2756 produce a bind, the given expression construct will be searched 

2757 for a bound element, typically a :class:`_schema.Table` 

2758 associated with 

2759 bound :class:`_schema.MetaData`. 

2760 

2761 .. seealso:: 

2762 

2763 :ref:`session_partitioning` 

2764 

2765 :paramref:`.Session.binds` 

2766 

2767 :meth:`.Session.bind_mapper` 

2768 

2769 :meth:`.Session.bind_table` 

2770 

2771 """ 

2772 

2773 # this function is documented as a subclassing hook, so we have 

2774 # to call this method even if the return is simple 

2775 if bind: 

2776 return bind 

2777 elif not self.__binds and self.bind: 

2778 # simplest and most common case, we have a bind and no 

2779 # per-mapper/table binds, we're done 

2780 return self.bind 

2781 

2782 # we don't have self.bind and either have self.__binds 

2783 # or we don't have self.__binds (which is legacy). Look at the 

2784 # mapper and the clause 

2785 if mapper is None and clause is None: 

2786 if self.bind: 

2787 return self.bind 

2788 else: 

2789 raise sa_exc.UnboundExecutionError( 

2790 "This session is not bound to a single Engine or " 

2791 "Connection, and no context was provided to locate " 

2792 "a binding." 

2793 ) 

2794 

2795 # look more closely at the mapper. 

2796 if mapper is not None: 

2797 try: 

2798 inspected_mapper = inspect(mapper) 

2799 except sa_exc.NoInspectionAvailable as err: 

2800 if isinstance(mapper, type): 

2801 raise exc.UnmappedClassError(mapper) from err 

2802 else: 

2803 raise 

2804 else: 

2805 inspected_mapper = None 

2806 

2807 # match up the mapper or clause in the __binds 

2808 if self.__binds: 

2809 # matching mappers and selectables to entries in the 

2810 # binds dictionary; supported use case. 

2811 if inspected_mapper: 

2812 for cls in inspected_mapper.class_.__mro__: 

2813 if cls in self.__binds: 

2814 return self.__binds[cls] 

2815 if clause is None: 

2816 clause = inspected_mapper.persist_selectable 

2817 

2818 if clause is not None: 

2819 plugin_subject = clause._propagate_attrs.get( 

2820 "plugin_subject", None 

2821 ) 

2822 

2823 if plugin_subject is not None: 

2824 for cls in plugin_subject.mapper.class_.__mro__: 

2825 if cls in self.__binds: 

2826 return self.__binds[cls] 

2827 

2828 for obj in visitors.iterate(clause): 

2829 if obj in self.__binds: 

2830 if TYPE_CHECKING: 

2831 assert isinstance(obj, Table) 

2832 return self.__binds[obj] 

2833 

2834 # none of the __binds matched, but we have a fallback bind. 

2835 # return that 

2836 if self.bind: 

2837 return self.bind 

2838 

2839 context = [] 

2840 if inspected_mapper is not None: 

2841 context.append(f"mapper {inspected_mapper}") 

2842 if clause is not None: 

2843 context.append("SQL expression") 

2844 

2845 raise sa_exc.UnboundExecutionError( 

2846 f"Could not locate a bind configured on " 

2847 f'{", ".join(context)} or this Session.' 

2848 ) 

2849 

2850 @overload 

2851 def query(self, _entity: _EntityType[_O]) -> Query[_O]: ... 

2852 

2853 @overload 

2854 def query( 

2855 self, _colexpr: TypedColumnsClauseRole[_T] 

2856 ) -> RowReturningQuery[Tuple[_T]]: ... 

2857 

2858 # START OVERLOADED FUNCTIONS self.query RowReturningQuery 2-8 

2859 

2860 # code within this block is **programmatically, 

2861 # statically generated** by tools/generate_tuple_map_overloads.py 

2862 

2863 @overload 

2864 def query( 

2865 self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1] 

2866 ) -> RowReturningQuery[Tuple[_T0, _T1]]: ... 

2867 

2868 @overload 

2869 def query( 

2870 self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2] 

2871 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]: ... 

2872 

2873 @overload 

2874 def query( 

2875 self, 

2876 __ent0: _TCCA[_T0], 

2877 __ent1: _TCCA[_T1], 

2878 __ent2: _TCCA[_T2], 

2879 __ent3: _TCCA[_T3], 

2880 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]: ... 

2881 

2882 @overload 

2883 def query( 

2884 self, 

2885 __ent0: _TCCA[_T0], 

2886 __ent1: _TCCA[_T1], 

2887 __ent2: _TCCA[_T2], 

2888 __ent3: _TCCA[_T3], 

2889 __ent4: _TCCA[_T4], 

2890 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]: ... 

2891 

2892 @overload 

2893 def query( 

2894 self, 

2895 __ent0: _TCCA[_T0], 

2896 __ent1: _TCCA[_T1], 

2897 __ent2: _TCCA[_T2], 

2898 __ent3: _TCCA[_T3], 

2899 __ent4: _TCCA[_T4], 

2900 __ent5: _TCCA[_T5], 

2901 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]: ... 

2902 

2903 @overload 

2904 def query( 

2905 self, 

2906 __ent0: _TCCA[_T0], 

2907 __ent1: _TCCA[_T1], 

2908 __ent2: _TCCA[_T2], 

2909 __ent3: _TCCA[_T3], 

2910 __ent4: _TCCA[_T4], 

2911 __ent5: _TCCA[_T5], 

2912 __ent6: _TCCA[_T6], 

2913 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]: ... 

2914 

2915 @overload 

2916 def query( 

2917 self, 

2918 __ent0: _TCCA[_T0], 

2919 __ent1: _TCCA[_T1], 

2920 __ent2: _TCCA[_T2], 

2921 __ent3: _TCCA[_T3], 

2922 __ent4: _TCCA[_T4], 

2923 __ent5: _TCCA[_T5], 

2924 __ent6: _TCCA[_T6], 

2925 __ent7: _TCCA[_T7], 

2926 ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]: ... 

2927 

2928 # END OVERLOADED FUNCTIONS self.query 

2929 

2930 @overload 

2931 def query( 

2932 self, *entities: _ColumnsClauseArgument[Any], **kwargs: Any 

2933 ) -> Query[Any]: ... 

2934 

2935 def query( 

2936 self, *entities: _ColumnsClauseArgument[Any], **kwargs: Any 

2937 ) -> Query[Any]: 

2938 """Return a new :class:`_query.Query` object corresponding to this 

2939 :class:`_orm.Session`. 

2940 

2941 Note that the :class:`_query.Query` object is legacy as of 

2942 SQLAlchemy 2.0; the :func:`_sql.select` construct is now used 

2943 to construct ORM queries. 

2944 

2945 .. seealso:: 

2946 

2947 :ref:`unified_tutorial` 

2948 

2949 :ref:`queryguide_toplevel` 

2950 

2951 :ref:`query_api_toplevel` - legacy API doc 

2952 

2953 """ 

2954 

2955 return self._query_cls(entities, self, **kwargs) 

2956 

2957 def _identity_lookup( 

2958 self, 

2959 mapper: Mapper[_O], 

2960 primary_key_identity: Union[Any, Tuple[Any, ...]], 

2961 identity_token: Any = None, 

2962 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 

2963 lazy_loaded_from: Optional[InstanceState[Any]] = None, 

2964 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

2965 bind_arguments: Optional[_BindArguments] = None, 

2966 ) -> Union[Optional[_O], LoaderCallableStatus]: 

2967 """Locate an object in the identity map. 

2968 

2969 Given a primary key identity, constructs an identity key and then 

2970 looks in the session's identity map. If present, the object may 

2971 be run through unexpiration rules (e.g. load unloaded attributes, 

2972 check if was deleted). 

2973 

2974 e.g.:: 

2975 

2976 obj = session._identity_lookup(inspect(SomeClass), (1,)) 

2977 

2978 :param mapper: mapper in use 

2979 :param primary_key_identity: the primary key we are searching for, as 

2980 a tuple. 

2981 :param identity_token: identity token that should be used to create 

2982 the identity key. Used as is, however overriding subclasses can 

2983 repurpose this in order to interpret the value in a special way, 

2984 such as if None then look among multiple target tokens. 

2985 :param passive: passive load flag passed to 

2986 :func:`.loading.get_from_identity`, which impacts the behavior if 

2987 the object is found; the object may be validated and/or unexpired 

2988 if the flag allows for SQL to be emitted. 

2989 :param lazy_loaded_from: an :class:`.InstanceState` that is 

2990 specifically asking for this identity as a related identity. Used 

2991 for sharding schemes where there is a correspondence between an object 

2992 and a related object being lazy-loaded (or otherwise 

2993 relationship-loaded). 

2994 

2995 :return: None if the object is not found in the identity map, *or* 

2996 if the object was unexpired and found to have been deleted. 

2997 if passive flags disallow SQL and the object is expired, returns 

2998 PASSIVE_NO_RESULT. In all other cases the instance is returned. 

2999 

3000 .. versionchanged:: 1.4.0 - the :meth:`.Session._identity_lookup` 

3001 method was moved from :class:`_query.Query` to 

3002 :class:`.Session`, to avoid having to instantiate the 

3003 :class:`_query.Query` object. 

3004 

3005 

3006 """ 

3007 

3008 key = mapper.identity_key_from_primary_key( 

3009 primary_key_identity, identity_token=identity_token 

3010 ) 

3011 

3012 # work around: https://github.com/python/typing/discussions/1143 

3013 return_value = loading.get_from_identity(self, mapper, key, passive) 

3014 return return_value 

3015 

3016 @util.non_memoized_property 

3017 @contextlib.contextmanager 

3018 def no_autoflush(self) -> Iterator[Session]: 

3019 """Return a context manager that disables autoflush. 

3020 

3021 e.g.:: 

3022 

3023 with session.no_autoflush: 

3024 

3025 some_object = SomeClass() 

3026 session.add(some_object) 

3027 # won't autoflush 

3028 some_object.related_thing = session.query(SomeRelated).first() 

3029 

3030 Operations that proceed within the ``with:`` block 

3031 will not be subject to flushes occurring upon query 

3032 access. This is useful when initializing a series 

3033 of objects which involve existing database queries, 

3034 where the uncompleted object should not yet be flushed. 

3035 

3036 """ 

3037 autoflush = self.autoflush 

3038 self.autoflush = False 

3039 try: 

3040 yield self 

3041 finally: 

3042 self.autoflush = autoflush 

3043 

3044 @util.langhelpers.tag_method_for_warnings( 

3045 "This warning originated from the Session 'autoflush' process, " 

3046 "which was invoked automatically in response to a user-initiated " 

3047 "operation. Consider using ``no_autoflush`` context manager if this " 

3048 "warning happended while initializing objects.", 

3049 sa_exc.SAWarning, 

3050 ) 

3051 def _autoflush(self) -> None: 

3052 if self.autoflush and not self._flushing: 

3053 try: 

3054 self.flush() 

3055 except sa_exc.StatementError as e: 

3056 # note we are reraising StatementError as opposed to 

3057 # raising FlushError with "chaining" to remain compatible 

3058 # with code that catches StatementError, IntegrityError, 

3059 # etc. 

3060 e.add_detail( 

3061 "raised as a result of Query-invoked autoflush; " 

3062 "consider using a session.no_autoflush block if this " 

3063 "flush is occurring prematurely" 

3064 ) 

3065 raise e.with_traceback(sys.exc_info()[2]) 

3066 

3067 def refresh( 

3068 self, 

3069 instance: object, 

3070 attribute_names: Optional[Iterable[str]] = None, 

3071 with_for_update: ForUpdateParameter = None, 

3072 ) -> None: 

3073 """Expire and refresh attributes on the given instance. 

3074 

3075 The selected attributes will first be expired as they would when using 

3076 :meth:`_orm.Session.expire`; then a SELECT statement will be issued to 

3077 the database to refresh column-oriented attributes with the current 

3078 value available in the current transaction. 

3079 

3080 :func:`_orm.relationship` oriented attributes will also be immediately 

3081 loaded if they were already eagerly loaded on the object, using the 

3082 same eager loading strategy that they were loaded with originally. 

3083 

3084 .. versionadded:: 1.4 - the :meth:`_orm.Session.refresh` method 

3085 can also refresh eagerly loaded attributes. 

3086 

3087 :func:`_orm.relationship` oriented attributes that would normally 

3088 load using the ``select`` (or "lazy") loader strategy will also 

3089 load **if they are named explicitly in the attribute_names 

3090 collection**, emitting a SELECT statement for the attribute using the 

3091 ``immediate`` loader strategy. If lazy-loaded relationships are not 

3092 named in :paramref:`_orm.Session.refresh.attribute_names`, then 

3093 they remain as "lazy loaded" attributes and are not implicitly 

3094 refreshed. 

3095 

3096 .. versionchanged:: 2.0.4 The :meth:`_orm.Session.refresh` method 

3097 will now refresh lazy-loaded :func:`_orm.relationship` oriented 

3098 attributes for those which are named explicitly in the 

3099 :paramref:`_orm.Session.refresh.attribute_names` collection. 

3100 

3101 .. tip:: 

3102 

3103 While the :meth:`_orm.Session.refresh` method is capable of 

3104 refreshing both column and relationship oriented attributes, its 

3105 primary focus is on refreshing of local column-oriented attributes 

3106 on a single instance. For more open ended "refresh" functionality, 

3107 including the ability to refresh the attributes on many objects at 

3108 once while having explicit control over relationship loader 

3109 strategies, use the 

3110 :ref:`populate existing <orm_queryguide_populate_existing>` feature 

3111 instead. 

3112 

3113 Note that a highly isolated transaction will return the same values as 

3114 were previously read in that same transaction, regardless of changes 

3115 in database state outside of that transaction. Refreshing 

3116 attributes usually only makes sense at the start of a transaction 

3117 where database rows have not yet been accessed. 

3118 

3119 :param attribute_names: optional. An iterable collection of 

3120 string attribute names indicating a subset of attributes to 

3121 be refreshed. 

3122 

3123 :param with_for_update: optional boolean ``True`` indicating FOR UPDATE 

3124 should be used, or may be a dictionary containing flags to 

3125 indicate a more specific set of FOR UPDATE flags for the SELECT; 

3126 flags should match the parameters of 

3127 :meth:`_query.Query.with_for_update`. 

3128 Supersedes the :paramref:`.Session.refresh.lockmode` parameter. 

3129 

3130 .. seealso:: 

3131 

3132 :ref:`session_expire` - introductory material 

3133 

3134 :meth:`.Session.expire` 

3135 

3136 :meth:`.Session.expire_all` 

3137 

3138 :ref:`orm_queryguide_populate_existing` - allows any ORM query 

3139 to refresh objects as they would be loaded normally. 

3140 

3141 """ 

3142 try: 

3143 state = attributes.instance_state(instance) 

3144 except exc.NO_STATE as err: 

3145 raise exc.UnmappedInstanceError(instance) from err 

3146 

3147 self._expire_state(state, attribute_names) 

3148 

3149 # this autoflush previously used to occur as a secondary effect 

3150 # of the load_on_ident below. Meaning we'd organize the SELECT 

3151 # based on current DB pks, then flush, then if pks changed in that 

3152 # flush, crash. this was unticketed but discovered as part of 

3153 # #8703. So here, autoflush up front, dont autoflush inside 

3154 # load_on_ident. 

3155 self._autoflush() 

3156 

3157 if with_for_update == {}: 

3158 raise sa_exc.ArgumentError( 

3159 "with_for_update should be the boolean value " 

3160 "True, or a dictionary with options. " 

3161 "A blank dictionary is ambiguous." 

3162 ) 

3163 

3164 with_for_update = ForUpdateArg._from_argument(with_for_update) 

3165 

3166 stmt: Select[Any] = sql.select(object_mapper(instance)) 

3167 if ( 

3168 loading.load_on_ident( 

3169 self, 

3170 stmt, 

3171 state.key, 

3172 refresh_state=state, 

3173 with_for_update=with_for_update, 

3174 only_load_props=attribute_names, 

3175 require_pk_cols=True, 

3176 # technically unnecessary as we just did autoflush 

3177 # above, however removes the additional unnecessary 

3178 # call to _autoflush() 

3179 no_autoflush=True, 

3180 is_user_refresh=True, 

3181 ) 

3182 is None 

3183 ): 

3184 raise sa_exc.InvalidRequestError( 

3185 "Could not refresh instance '%s'" % instance_str(instance) 

3186 ) 

3187 

3188 def expire_all(self) -> None: 

3189 """Expires all persistent instances within this Session. 

3190 

3191 When any attributes on a persistent instance is next accessed, 

3192 a query will be issued using the 

3193 :class:`.Session` object's current transactional context in order to 

3194 load all expired attributes for the given instance. Note that 

3195 a highly isolated transaction will return the same values as were 

3196 previously read in that same transaction, regardless of changes 

3197 in database state outside of that transaction. 

3198 

3199 To expire individual objects and individual attributes 

3200 on those objects, use :meth:`Session.expire`. 

3201 

3202 The :class:`.Session` object's default behavior is to 

3203 expire all state whenever the :meth:`Session.rollback` 

3204 or :meth:`Session.commit` methods are called, so that new 

3205 state can be loaded for the new transaction. For this reason, 

3206 calling :meth:`Session.expire_all` is not usually needed, 

3207 assuming the transaction is isolated. 

3208 

3209 .. seealso:: 

3210 

3211 :ref:`session_expire` - introductory material 

3212 

3213 :meth:`.Session.expire` 

3214 

3215 :meth:`.Session.refresh` 

3216 

3217 :meth:`_orm.Query.populate_existing` 

3218 

3219 """ 

3220 for state in self.identity_map.all_states(): 

3221 state._expire(state.dict, self.identity_map._modified) 

3222 

3223 def expire( 

3224 self, instance: object, attribute_names: Optional[Iterable[str]] = None 

3225 ) -> None: 

3226 """Expire the attributes on an instance. 

3227 

3228 Marks the attributes of an instance as out of date. When an expired 

3229 attribute is next accessed, a query will be issued to the 

3230 :class:`.Session` object's current transactional context in order to 

3231 load all expired attributes for the given instance. Note that 

3232 a highly isolated transaction will return the same values as were 

3233 previously read in that same transaction, regardless of changes 

3234 in database state outside of that transaction. 

3235 

3236 To expire all objects in the :class:`.Session` simultaneously, 

3237 use :meth:`Session.expire_all`. 

3238 

3239 The :class:`.Session` object's default behavior is to 

3240 expire all state whenever the :meth:`Session.rollback` 

3241 or :meth:`Session.commit` methods are called, so that new 

3242 state can be loaded for the new transaction. For this reason, 

3243 calling :meth:`Session.expire` only makes sense for the specific 

3244 case that a non-ORM SQL statement was emitted in the current 

3245 transaction. 

3246 

3247 :param instance: The instance to be refreshed. 

3248 :param attribute_names: optional list of string attribute names 

3249 indicating a subset of attributes to be expired. 

3250 

3251 .. seealso:: 

3252 

3253 :ref:`session_expire` - introductory material 

3254 

3255 :meth:`.Session.expire` 

3256 

3257 :meth:`.Session.refresh` 

3258 

3259 :meth:`_orm.Query.populate_existing` 

3260 

3261 """ 

3262 try: 

3263 state = attributes.instance_state(instance) 

3264 except exc.NO_STATE as err: 

3265 raise exc.UnmappedInstanceError(instance) from err 

3266 self._expire_state(state, attribute_names) 

3267 

3268 def _expire_state( 

3269 self, 

3270 state: InstanceState[Any], 

3271 attribute_names: Optional[Iterable[str]], 

3272 ) -> None: 

3273 self._validate_persistent(state) 

3274 if attribute_names: 

3275 state._expire_attributes(state.dict, attribute_names) 

3276 else: 

3277 # pre-fetch the full cascade since the expire is going to 

3278 # remove associations 

3279 cascaded = list( 

3280 state.manager.mapper.cascade_iterator("refresh-expire", state) 

3281 ) 

3282 self._conditional_expire(state) 

3283 for o, m, st_, dct_ in cascaded: 

3284 self._conditional_expire(st_) 

3285 

3286 def _conditional_expire( 

3287 self, state: InstanceState[Any], autoflush: Optional[bool] = None 

3288 ) -> None: 

3289 """Expire a state if persistent, else expunge if pending""" 

3290 

3291 if state.key: 

3292 state._expire(state.dict, self.identity_map._modified) 

3293 elif state in self._new: 

3294 self._new.pop(state) 

3295 state._detach(self) 

3296 

3297 def expunge(self, instance: object) -> None: 

3298 """Remove the `instance` from this ``Session``. 

3299 

3300 This will free all internal references to the instance. Cascading 

3301 will be applied according to the *expunge* cascade rule. 

3302 

3303 """ 

3304 try: 

3305 state = attributes.instance_state(instance) 

3306 except exc.NO_STATE as err: 

3307 raise exc.UnmappedInstanceError(instance) from err 

3308 if state.session_id is not self.hash_key: 

3309 raise sa_exc.InvalidRequestError( 

3310 "Instance %s is not present in this Session" % state_str(state) 

3311 ) 

3312 

3313 cascaded = list( 

3314 state.manager.mapper.cascade_iterator("expunge", state) 

3315 ) 

3316 self._expunge_states([state] + [st_ for o, m, st_, dct_ in cascaded]) 

3317 

3318 def _expunge_states( 

3319 self, states: Iterable[InstanceState[Any]], to_transient: bool = False 

3320 ) -> None: 

3321 for state in states: 

3322 if state in self._new: 

3323 self._new.pop(state) 

3324 elif self.identity_map.contains_state(state): 

3325 self.identity_map.safe_discard(state) 

3326 self._deleted.pop(state, None) 

3327 elif self._transaction: 

3328 # state is "detached" from being deleted, but still present 

3329 # in the transaction snapshot 

3330 self._transaction._deleted.pop(state, None) 

3331 statelib.InstanceState._detach_states( 

3332 states, self, to_transient=to_transient 

3333 ) 

3334 

3335 def _register_persistent(self, states: Set[InstanceState[Any]]) -> None: 

3336 """Register all persistent objects from a flush. 

3337 

3338 This is used both for pending objects moving to the persistent 

3339 state as well as already persistent objects. 

3340 

3341 """ 

3342 

3343 pending_to_persistent = self.dispatch.pending_to_persistent or None 

3344 for state in states: 

3345 mapper = _state_mapper(state) 

3346 

3347 # prevent against last minute dereferences of the object 

3348 obj = state.obj() 

3349 if obj is not None: 

3350 instance_key = mapper._identity_key_from_state(state) 

3351 

3352 if ( 

3353 _none_set.intersection(instance_key[1]) 

3354 and not mapper.allow_partial_pks 

3355 or _none_set.issuperset(instance_key[1]) 

3356 ): 

3357 raise exc.FlushError( 

3358 "Instance %s has a NULL identity key. If this is an " 

3359 "auto-generated value, check that the database table " 

3360 "allows generation of new primary key values, and " 

3361 "that the mapped Column object is configured to " 

3362 "expect these generated values. Ensure also that " 

3363 "this flush() is not occurring at an inappropriate " 

3364 "time, such as within a load() event." 

3365 % state_str(state) 

3366 ) 

3367 

3368 if state.key is None: 

3369 state.key = instance_key 

3370 elif state.key != instance_key: 

3371 # primary key switch. use safe_discard() in case another 

3372 # state has already replaced this one in the identity 

3373 # map (see test/orm/test_naturalpks.py ReversePKsTest) 

3374 self.identity_map.safe_discard(state) 

3375 trans = self._transaction 

3376 assert trans is not None 

3377 if state in trans._key_switches: 

3378 orig_key = trans._key_switches[state][0] 

3379 else: 

3380 orig_key = state.key 

3381 trans._key_switches[state] = ( 

3382 orig_key, 

3383 instance_key, 

3384 ) 

3385 state.key = instance_key 

3386 

3387 # there can be an existing state in the identity map 

3388 # that is replaced when the primary keys of two instances 

3389 # are swapped; see test/orm/test_naturalpks.py -> test_reverse 

3390 old = self.identity_map.replace(state) 

3391 if ( 

3392 old is not None 

3393 and mapper._identity_key_from_state(old) == instance_key 

3394 and old.obj() is not None 

3395 ): 

3396 util.warn( 

3397 "Identity map already had an identity for %s, " 

3398 "replacing it with newly flushed object. Are there " 

3399 "load operations occurring inside of an event handler " 

3400 "within the flush?" % (instance_key,) 

3401 ) 

3402 state._orphaned_outside_of_session = False 

3403 

3404 statelib.InstanceState._commit_all_states( 

3405 ((state, state.dict) for state in states), self.identity_map 

3406 ) 

3407 

3408 self._register_altered(states) 

3409 

3410 if pending_to_persistent is not None: 

3411 for state in states.intersection(self._new): 

3412 pending_to_persistent(self, state) 

3413 

3414 # remove from new last, might be the last strong ref 

3415 for state in set(states).intersection(self._new): 

3416 self._new.pop(state) 

3417 

3418 def _register_altered(self, states: Iterable[InstanceState[Any]]) -> None: 

3419 if self._transaction: 

3420 for state in states: 

3421 if state in self._new: 

3422 self._transaction._new[state] = True 

3423 else: 

3424 self._transaction._dirty[state] = True 

3425 

3426 def _remove_newly_deleted( 

3427 self, states: Iterable[InstanceState[Any]] 

3428 ) -> None: 

3429 persistent_to_deleted = self.dispatch.persistent_to_deleted or None 

3430 for state in states: 

3431 if self._transaction: 

3432 self._transaction._deleted[state] = True 

3433 

3434 if persistent_to_deleted is not None: 

3435 # get a strong reference before we pop out of 

3436 # self._deleted 

3437 obj = state.obj() # noqa 

3438 

3439 self.identity_map.safe_discard(state) 

3440 self._deleted.pop(state, None) 

3441 state._deleted = True 

3442 # can't call state._detach() here, because this state 

3443 # is still in the transaction snapshot and needs to be 

3444 # tracked as part of that 

3445 if persistent_to_deleted is not None: 

3446 persistent_to_deleted(self, state) 

3447 

3448 def add(self, instance: object, _warn: bool = True) -> None: 

3449 """Place an object into this :class:`_orm.Session`. 

3450 

3451 Objects that are in the :term:`transient` state when passed to the 

3452 :meth:`_orm.Session.add` method will move to the 

3453 :term:`pending` state, until the next flush, at which point they 

3454 will move to the :term:`persistent` state. 

3455 

3456 Objects that are in the :term:`detached` state when passed to the 

3457 :meth:`_orm.Session.add` method will move to the :term:`persistent` 

3458 state directly. 

3459 

3460 If the transaction used by the :class:`_orm.Session` is rolled back, 

3461 objects which were transient when they were passed to 

3462 :meth:`_orm.Session.add` will be moved back to the 

3463 :term:`transient` state, and will no longer be present within this 

3464 :class:`_orm.Session`. 

3465 

3466 .. seealso:: 

3467 

3468 :meth:`_orm.Session.add_all` 

3469 

3470 :ref:`session_adding` - at :ref:`session_basics` 

3471 

3472 """ 

3473 if _warn and self._warn_on_events: 

3474 self._flush_warning("Session.add()") 

3475 

3476 try: 

3477 state = attributes.instance_state(instance) 

3478 except exc.NO_STATE as err: 

3479 raise exc.UnmappedInstanceError(instance) from err 

3480 

3481 self._save_or_update_state(state) 

3482 

3483 def add_all(self, instances: Iterable[object]) -> None: 

3484 """Add the given collection of instances to this :class:`_orm.Session`. 

3485 

3486 See the documentation for :meth:`_orm.Session.add` for a general 

3487 behavioral description. 

3488 

3489 .. seealso:: 

3490 

3491 :meth:`_orm.Session.add` 

3492 

3493 :ref:`session_adding` - at :ref:`session_basics` 

3494 

3495 """ 

3496 

3497 if self._warn_on_events: 

3498 self._flush_warning("Session.add_all()") 

3499 

3500 for instance in instances: 

3501 self.add(instance, _warn=False) 

3502 

3503 def _save_or_update_state(self, state: InstanceState[Any]) -> None: 

3504 state._orphaned_outside_of_session = False 

3505 self._save_or_update_impl(state) 

3506 

3507 mapper = _state_mapper(state) 

3508 for o, m, st_, dct_ in mapper.cascade_iterator( 

3509 "save-update", state, halt_on=self._contains_state 

3510 ): 

3511 self._save_or_update_impl(st_) 

3512 

3513 def delete(self, instance: object) -> None: 

3514 """Mark an instance as deleted. 

3515 

3516 The object is assumed to be either :term:`persistent` or 

3517 :term:`detached` when passed; after the method is called, the 

3518 object will remain in the :term:`persistent` state until the next 

3519 flush proceeds. During this time, the object will also be a member 

3520 of the :attr:`_orm.Session.deleted` collection. 

3521 

3522 When the next flush proceeds, the object will move to the 

3523 :term:`deleted` state, indicating a ``DELETE`` statement was emitted 

3524 for its row within the current transaction. When the transaction 

3525 is successfully committed, 

3526 the deleted object is moved to the :term:`detached` state and is 

3527 no longer present within this :class:`_orm.Session`. 

3528 

3529 .. seealso:: 

3530 

3531 :ref:`session_deleting` - at :ref:`session_basics` 

3532 

3533 """ 

3534 if self._warn_on_events: 

3535 self._flush_warning("Session.delete()") 

3536 

3537 try: 

3538 state = attributes.instance_state(instance) 

3539 except exc.NO_STATE as err: 

3540 raise exc.UnmappedInstanceError(instance) from err 

3541 

3542 self._delete_impl(state, instance, head=True) 

3543 

3544 def _delete_impl( 

3545 self, state: InstanceState[Any], obj: object, head: bool 

3546 ) -> None: 

3547 if state.key is None: 

3548 if head: 

3549 raise sa_exc.InvalidRequestError( 

3550 "Instance '%s' is not persisted" % state_str(state) 

3551 ) 

3552 else: 

3553 return 

3554 

3555 to_attach = self._before_attach(state, obj) 

3556 

3557 if state in self._deleted: 

3558 return 

3559 

3560 self.identity_map.add(state) 

3561 

3562 if to_attach: 

3563 self._after_attach(state, obj) 

3564 

3565 if head: 

3566 # grab the cascades before adding the item to the deleted list 

3567 # so that autoflush does not delete the item 

3568 # the strong reference to the instance itself is significant here 

3569 cascade_states = list( 

3570 state.manager.mapper.cascade_iterator("delete", state) 

3571 ) 

3572 else: 

3573 cascade_states = None 

3574 

3575 self._deleted[state] = obj 

3576 

3577 if head: 

3578 if TYPE_CHECKING: 

3579 assert cascade_states is not None 

3580 for o, m, st_, dct_ in cascade_states: 

3581 self._delete_impl(st_, o, False) 

3582 

3583 def get( 

3584 self, 

3585 entity: _EntityBindKey[_O], 

3586 ident: _PKIdentityArgument, 

3587 *, 

3588 options: Optional[Sequence[ORMOption]] = None, 

3589 populate_existing: bool = False, 

3590 with_for_update: ForUpdateParameter = None, 

3591 identity_token: Optional[Any] = None, 

3592 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

3593 bind_arguments: Optional[_BindArguments] = None, 

3594 ) -> Optional[_O]: 

3595 """Return an instance based on the given primary key identifier, 

3596 or ``None`` if not found. 

3597 

3598 E.g.:: 

3599 

3600 my_user = session.get(User, 5) 

3601 

3602 some_object = session.get(VersionedFoo, (5, 10)) 

3603 

3604 some_object = session.get(VersionedFoo, {"id": 5, "version_id": 10}) 

3605 

3606 .. versionadded:: 1.4 Added :meth:`_orm.Session.get`, which is moved 

3607 from the now legacy :meth:`_orm.Query.get` method. 

3608 

3609 :meth:`_orm.Session.get` is special in that it provides direct 

3610 access to the identity map of the :class:`.Session`. 

3611 If the given primary key identifier is present 

3612 in the local identity map, the object is returned 

3613 directly from this collection and no SQL is emitted, 

3614 unless the object has been marked fully expired. 

3615 If not present, 

3616 a SELECT is performed in order to locate the object. 

3617 

3618 :meth:`_orm.Session.get` also will perform a check if 

3619 the object is present in the identity map and 

3620 marked as expired - a SELECT 

3621 is emitted to refresh the object as well as to 

3622 ensure that the row is still present. 

3623 If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised. 

3624 

3625 :param entity: a mapped class or :class:`.Mapper` indicating the 

3626 type of entity to be loaded. 

3627 

3628 :param ident: A scalar, tuple, or dictionary representing the 

3629 primary key. For a composite (e.g. multiple column) primary key, 

3630 a tuple or dictionary should be passed. 

3631 

3632 For a single-column primary key, the scalar calling form is typically 

3633 the most expedient. If the primary key of a row is the value "5", 

3634 the call looks like:: 

3635 

3636 my_object = session.get(SomeClass, 5) 

3637 

3638 The tuple form contains primary key values typically in 

3639 the order in which they correspond to the mapped 

3640 :class:`_schema.Table` 

3641 object's primary key columns, or if the 

3642 :paramref:`_orm.Mapper.primary_key` configuration parameter were 

3643 used, in 

3644 the order used for that parameter. For example, if the primary key 

3645 of a row is represented by the integer 

3646 digits "5, 10" the call would look like:: 

3647 

3648 my_object = session.get(SomeClass, (5, 10)) 

3649 

3650 The dictionary form should include as keys the mapped attribute names 

3651 corresponding to each element of the primary key. If the mapped class 

3652 has the attributes ``id``, ``version_id`` as the attributes which 

3653 store the object's primary key value, the call would look like:: 

3654 

3655 my_object = session.get(SomeClass, {"id": 5, "version_id": 10}) 

3656 

3657 :param options: optional sequence of loader options which will be 

3658 applied to the query, if one is emitted. 

3659 

3660 :param populate_existing: causes the method to unconditionally emit 

3661 a SQL query and refresh the object with the newly loaded data, 

3662 regardless of whether or not the object is already present. 

3663 

3664 :param with_for_update: optional boolean ``True`` indicating FOR UPDATE 

3665 should be used, or may be a dictionary containing flags to 

3666 indicate a more specific set of FOR UPDATE flags for the SELECT; 

3667 flags should match the parameters of 

3668 :meth:`_query.Query.with_for_update`. 

3669 Supersedes the :paramref:`.Session.refresh.lockmode` parameter. 

3670 

3671 :param execution_options: optional dictionary of execution options, 

3672 which will be associated with the query execution if one is emitted. 

3673 This dictionary can provide a subset of the options that are 

3674 accepted by :meth:`_engine.Connection.execution_options`, and may 

3675 also provide additional options understood only in an ORM context. 

3676 

3677 .. versionadded:: 1.4.29 

3678 

3679 .. seealso:: 

3680 

3681 :ref:`orm_queryguide_execution_options` - ORM-specific execution 

3682 options 

3683 

3684 :param bind_arguments: dictionary of additional arguments to determine 

3685 the bind. May include "mapper", "bind", or other custom arguments. 

3686 Contents of this dictionary are passed to the 

3687 :meth:`.Session.get_bind` method. 

3688 

3689 .. versionadded: 2.0.0rc1 

3690 

3691 :return: The object instance, or ``None``. 

3692 

3693 """ # noqa: E501 

3694 return self._get_impl( 

3695 entity, 

3696 ident, 

3697 loading.load_on_pk_identity, 

3698 options=options, 

3699 populate_existing=populate_existing, 

3700 with_for_update=with_for_update, 

3701 identity_token=identity_token, 

3702 execution_options=execution_options, 

3703 bind_arguments=bind_arguments, 

3704 ) 

3705 

3706 def get_one( 

3707 self, 

3708 entity: _EntityBindKey[_O], 

3709 ident: _PKIdentityArgument, 

3710 *, 

3711 options: Optional[Sequence[ORMOption]] = None, 

3712 populate_existing: bool = False, 

3713 with_for_update: ForUpdateParameter = None, 

3714 identity_token: Optional[Any] = None, 

3715 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

3716 bind_arguments: Optional[_BindArguments] = None, 

3717 ) -> _O: 

3718 """Return exactly one instance based on the given primary key 

3719 identifier, or raise an exception if not found. 

3720 

3721 Raises :class:`_exc.NoResultFound` if the query selects no rows. 

3722 

3723 For a detailed documentation of the arguments see the 

3724 method :meth:`.Session.get`. 

3725 

3726 .. versionadded:: 2.0.22 

3727 

3728 :return: The object instance. 

3729 

3730 .. seealso:: 

3731 

3732 :meth:`.Session.get` - equivalent method that instead 

3733 returns ``None`` if no row was found with the provided primary 

3734 key 

3735 

3736 """ 

3737 

3738 instance = self.get( 

3739 entity, 

3740 ident, 

3741 options=options, 

3742 populate_existing=populate_existing, 

3743 with_for_update=with_for_update, 

3744 identity_token=identity_token, 

3745 execution_options=execution_options, 

3746 bind_arguments=bind_arguments, 

3747 ) 

3748 

3749 if instance is None: 

3750 raise sa_exc.NoResultFound( 

3751 "No row was found when one was required" 

3752 ) 

3753 

3754 return instance 

3755 

3756 def _get_impl( 

3757 self, 

3758 entity: _EntityBindKey[_O], 

3759 primary_key_identity: _PKIdentityArgument, 

3760 db_load_fn: Callable[..., _O], 

3761 *, 

3762 options: Optional[Sequence[ExecutableOption]] = None, 

3763 populate_existing: bool = False, 

3764 with_for_update: ForUpdateParameter = None, 

3765 identity_token: Optional[Any] = None, 

3766 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

3767 bind_arguments: Optional[_BindArguments] = None, 

3768 ) -> Optional[_O]: 

3769 # convert composite types to individual args 

3770 if ( 

3771 is_composite_class(primary_key_identity) 

3772 and type(primary_key_identity) 

3773 in descriptor_props._composite_getters 

3774 ): 

3775 getter = descriptor_props._composite_getters[ 

3776 type(primary_key_identity) 

3777 ] 

3778 primary_key_identity = getter(primary_key_identity) 

3779 

3780 mapper: Optional[Mapper[_O]] = inspect(entity) 

3781 

3782 if mapper is None or not mapper.is_mapper: 

3783 raise sa_exc.ArgumentError( 

3784 "Expected mapped class or mapper, got: %r" % entity 

3785 ) 

3786 

3787 is_dict = isinstance(primary_key_identity, dict) 

3788 if not is_dict: 

3789 primary_key_identity = util.to_list( 

3790 primary_key_identity, default=[None] 

3791 ) 

3792 

3793 if len(primary_key_identity) != len(mapper.primary_key): 

3794 raise sa_exc.InvalidRequestError( 

3795 "Incorrect number of values in identifier to formulate " 

3796 "primary key for session.get(); primary key columns " 

3797 "are %s" % ",".join("'%s'" % c for c in mapper.primary_key) 

3798 ) 

3799 

3800 if is_dict: 

3801 pk_synonyms = mapper._pk_synonyms 

3802 

3803 if pk_synonyms: 

3804 correct_keys = set(pk_synonyms).intersection( 

3805 primary_key_identity 

3806 ) 

3807 

3808 if correct_keys: 

3809 primary_key_identity = dict(primary_key_identity) 

3810 for k in correct_keys: 

3811 primary_key_identity[pk_synonyms[k]] = ( 

3812 primary_key_identity[k] 

3813 ) 

3814 

3815 try: 

3816 primary_key_identity = list( 

3817 primary_key_identity[prop.key] 

3818 for prop in mapper._identity_key_props 

3819 ) 

3820 

3821 except KeyError as err: 

3822 raise sa_exc.InvalidRequestError( 

3823 "Incorrect names of values in identifier to formulate " 

3824 "primary key for session.get(); primary key attribute " 

3825 "names are %s (synonym names are also accepted)" 

3826 % ",".join( 

3827 "'%s'" % prop.key 

3828 for prop in mapper._identity_key_props 

3829 ) 

3830 ) from err 

3831 

3832 if ( 

3833 not populate_existing 

3834 and not mapper.always_refresh 

3835 and with_for_update is None 

3836 ): 

3837 instance = self._identity_lookup( 

3838 mapper, 

3839 primary_key_identity, 

3840 identity_token=identity_token, 

3841 execution_options=execution_options, 

3842 bind_arguments=bind_arguments, 

3843 ) 

3844 

3845 if instance is not None: 

3846 # reject calls for id in identity map but class 

3847 # mismatch. 

3848 if not isinstance(instance, mapper.class_): 

3849 return None 

3850 return instance 

3851 

3852 # TODO: this was being tested before, but this is not possible 

3853 assert instance is not LoaderCallableStatus.PASSIVE_CLASS_MISMATCH 

3854 

3855 # set_label_style() not strictly necessary, however this will ensure 

3856 # that tablename_colname style is used which at the moment is 

3857 # asserted in a lot of unit tests :) 

3858 

3859 load_options = context.QueryContext.default_load_options 

3860 

3861 if populate_existing: 

3862 load_options += {"_populate_existing": populate_existing} 

3863 statement = sql.select(mapper).set_label_style( 

3864 LABEL_STYLE_TABLENAME_PLUS_COL 

3865 ) 

3866 if with_for_update is not None: 

3867 statement._for_update_arg = ForUpdateArg._from_argument( 

3868 with_for_update 

3869 ) 

3870 

3871 if options: 

3872 statement = statement.options(*options) 

3873 return db_load_fn( 

3874 self, 

3875 statement, 

3876 primary_key_identity, 

3877 load_options=load_options, 

3878 identity_token=identity_token, 

3879 execution_options=execution_options, 

3880 bind_arguments=bind_arguments, 

3881 ) 

3882 

3883 def merge( 

3884 self, 

3885 instance: _O, 

3886 *, 

3887 load: bool = True, 

3888 options: Optional[Sequence[ORMOption]] = None, 

3889 ) -> _O: 

3890 """Copy the state of a given instance into a corresponding instance 

3891 within this :class:`.Session`. 

3892 

3893 :meth:`.Session.merge` examines the primary key attributes of the 

3894 source instance, and attempts to reconcile it with an instance of the 

3895 same primary key in the session. If not found locally, it attempts 

3896 to load the object from the database based on primary key, and if 

3897 none can be located, creates a new instance. The state of each 

3898 attribute on the source instance is then copied to the target 

3899 instance. The resulting target instance is then returned by the 

3900 method; the original source instance is left unmodified, and 

3901 un-associated with the :class:`.Session` if not already. 

3902 

3903 This operation cascades to associated instances if the association is 

3904 mapped with ``cascade="merge"``. 

3905 

3906 See :ref:`unitofwork_merging` for a detailed discussion of merging. 

3907 

3908 :param instance: Instance to be merged. 

3909 :param load: Boolean, when False, :meth:`.merge` switches into 

3910 a "high performance" mode which causes it to forego emitting history 

3911 events as well as all database access. This flag is used for 

3912 cases such as transferring graphs of objects into a :class:`.Session` 

3913 from a second level cache, or to transfer just-loaded objects 

3914 into the :class:`.Session` owned by a worker thread or process 

3915 without re-querying the database. 

3916 

3917 The ``load=False`` use case adds the caveat that the given 

3918 object has to be in a "clean" state, that is, has no pending changes 

3919 to be flushed - even if the incoming object is detached from any 

3920 :class:`.Session`. This is so that when 

3921 the merge operation populates local attributes and 

3922 cascades to related objects and 

3923 collections, the values can be "stamped" onto the 

3924 target object as is, without generating any history or attribute 

3925 events, and without the need to reconcile the incoming data with 

3926 any existing related objects or collections that might not 

3927 be loaded. The resulting objects from ``load=False`` are always 

3928 produced as "clean", so it is only appropriate that the given objects 

3929 should be "clean" as well, else this suggests a mis-use of the 

3930 method. 

3931 :param options: optional sequence of loader options which will be 

3932 applied to the :meth:`_orm.Session.get` method when the merge 

3933 operation loads the existing version of the object from the database. 

3934 

3935 .. versionadded:: 1.4.24 

3936 

3937 

3938 .. seealso:: 

3939 

3940 :func:`.make_transient_to_detached` - provides for an alternative 

3941 means of "merging" a single object into the :class:`.Session` 

3942 

3943 """ 

3944 

3945 if self._warn_on_events: 

3946 self._flush_warning("Session.merge()") 

3947 

3948 _recursive: Dict[InstanceState[Any], object] = {} 

3949 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object] = {} 

3950 

3951 if load: 

3952 # flush current contents if we expect to load data 

3953 self._autoflush() 

3954 

3955 object_mapper(instance) # verify mapped 

3956 autoflush = self.autoflush 

3957 try: 

3958 self.autoflush = False 

3959 return self._merge( 

3960 attributes.instance_state(instance), 

3961 attributes.instance_dict(instance), 

3962 load=load, 

3963 options=options, 

3964 _recursive=_recursive, 

3965 _resolve_conflict_map=_resolve_conflict_map, 

3966 ) 

3967 finally: 

3968 self.autoflush = autoflush 

3969 

3970 def _merge( 

3971 self, 

3972 state: InstanceState[_O], 

3973 state_dict: _InstanceDict, 

3974 *, 

3975 options: Optional[Sequence[ORMOption]] = None, 

3976 load: bool, 

3977 _recursive: Dict[Any, object], 

3978 _resolve_conflict_map: Dict[_IdentityKeyType[Any], object], 

3979 ) -> _O: 

3980 mapper: Mapper[_O] = _state_mapper(state) 

3981 if state in _recursive: 

3982 return cast(_O, _recursive[state]) 

3983 

3984 new_instance = False 

3985 key = state.key 

3986 

3987 merged: Optional[_O] 

3988 

3989 if key is None: 

3990 if state in self._new: 

3991 util.warn( 

3992 "Instance %s is already pending in this Session yet is " 

3993 "being merged again; this is probably not what you want " 

3994 "to do" % state_str(state) 

3995 ) 

3996 

3997 if not load: 

3998 raise sa_exc.InvalidRequestError( 

3999 "merge() with load=False option does not support " 

4000 "objects transient (i.e. unpersisted) objects. flush() " 

4001 "all changes on mapped instances before merging with " 

4002 "load=False." 

4003 ) 

4004 key = mapper._identity_key_from_state(state) 

4005 key_is_persistent = LoaderCallableStatus.NEVER_SET not in key[ 

4006 1 

4007 ] and ( 

4008 not _none_set.intersection(key[1]) 

4009 or ( 

4010 mapper.allow_partial_pks 

4011 and not _none_set.issuperset(key[1]) 

4012 ) 

4013 ) 

4014 else: 

4015 key_is_persistent = True 

4016 

4017 merged = self.identity_map.get(key) 

4018 

4019 if merged is None: 

4020 if key_is_persistent and key in _resolve_conflict_map: 

4021 merged = cast(_O, _resolve_conflict_map[key]) 

4022 

4023 elif not load: 

4024 if state.modified: 

4025 raise sa_exc.InvalidRequestError( 

4026 "merge() with load=False option does not support " 

4027 "objects marked as 'dirty'. flush() all changes on " 

4028 "mapped instances before merging with load=False." 

4029 ) 

4030 merged = mapper.class_manager.new_instance() 

4031 merged_state = attributes.instance_state(merged) 

4032 merged_state.key = key 

4033 self._update_impl(merged_state) 

4034 new_instance = True 

4035 

4036 elif key_is_persistent: 

4037 merged = self.get( 

4038 mapper.class_, 

4039 key[1], 

4040 identity_token=key[2], 

4041 options=options, 

4042 ) 

4043 

4044 if merged is None: 

4045 merged = mapper.class_manager.new_instance() 

4046 merged_state = attributes.instance_state(merged) 

4047 merged_dict = attributes.instance_dict(merged) 

4048 new_instance = True 

4049 self._save_or_update_state(merged_state) 

4050 else: 

4051 merged_state = attributes.instance_state(merged) 

4052 merged_dict = attributes.instance_dict(merged) 

4053 

4054 _recursive[state] = merged 

4055 _resolve_conflict_map[key] = merged 

4056 

4057 # check that we didn't just pull the exact same 

4058 # state out. 

4059 if state is not merged_state: 

4060 # version check if applicable 

4061 if mapper.version_id_col is not None: 

4062 existing_version = mapper._get_state_attr_by_column( 

4063 state, 

4064 state_dict, 

4065 mapper.version_id_col, 

4066 passive=PassiveFlag.PASSIVE_NO_INITIALIZE, 

4067 ) 

4068 

4069 merged_version = mapper._get_state_attr_by_column( 

4070 merged_state, 

4071 merged_dict, 

4072 mapper.version_id_col, 

4073 passive=PassiveFlag.PASSIVE_NO_INITIALIZE, 

4074 ) 

4075 

4076 if ( 

4077 existing_version 

4078 is not LoaderCallableStatus.PASSIVE_NO_RESULT 

4079 and merged_version 

4080 is not LoaderCallableStatus.PASSIVE_NO_RESULT 

4081 and existing_version != merged_version 

4082 ): 

4083 raise exc.StaleDataError( 

4084 "Version id '%s' on merged state %s " 

4085 "does not match existing version '%s'. " 

4086 "Leave the version attribute unset when " 

4087 "merging to update the most recent version." 

4088 % ( 

4089 existing_version, 

4090 state_str(merged_state), 

4091 merged_version, 

4092 ) 

4093 ) 

4094 

4095 merged_state.load_path = state.load_path 

4096 merged_state.load_options = state.load_options 

4097 

4098 # since we are copying load_options, we need to copy 

4099 # the callables_ that would have been generated by those 

4100 # load_options. 

4101 # assumes that the callables we put in state.callables_ 

4102 # are not instance-specific (which they should not be) 

4103 merged_state._copy_callables(state) 

4104 

4105 for prop in mapper.iterate_properties: 

4106 prop.merge( 

4107 self, 

4108 state, 

4109 state_dict, 

4110 merged_state, 

4111 merged_dict, 

4112 load, 

4113 _recursive, 

4114 _resolve_conflict_map, 

4115 ) 

4116 

4117 if not load: 

4118 # remove any history 

4119 merged_state._commit_all(merged_dict, self.identity_map) 

4120 merged_state.manager.dispatch._sa_event_merge_wo_load( 

4121 merged_state, None 

4122 ) 

4123 

4124 if new_instance: 

4125 merged_state.manager.dispatch.load(merged_state, None) 

4126 

4127 return merged 

4128 

4129 def _validate_persistent(self, state: InstanceState[Any]) -> None: 

4130 if not self.identity_map.contains_state(state): 

4131 raise sa_exc.InvalidRequestError( 

4132 "Instance '%s' is not persistent within this Session" 

4133 % state_str(state) 

4134 ) 

4135 

4136 def _save_impl(self, state: InstanceState[Any]) -> None: 

4137 if state.key is not None: 

4138 raise sa_exc.InvalidRequestError( 

4139 "Object '%s' already has an identity - " 

4140 "it can't be registered as pending" % state_str(state) 

4141 ) 

4142 

4143 obj = state.obj() 

4144 to_attach = self._before_attach(state, obj) 

4145 if state not in self._new: 

4146 self._new[state] = obj 

4147 state.insert_order = len(self._new) 

4148 if to_attach: 

4149 self._after_attach(state, obj) 

4150 

4151 def _update_impl( 

4152 self, state: InstanceState[Any], revert_deletion: bool = False 

4153 ) -> None: 

4154 if state.key is None: 

4155 raise sa_exc.InvalidRequestError( 

4156 "Instance '%s' is not persisted" % state_str(state) 

4157 ) 

4158 

4159 if state._deleted: 

4160 if revert_deletion: 

4161 if not state._attached: 

4162 return 

4163 del state._deleted 

4164 else: 

4165 raise sa_exc.InvalidRequestError( 

4166 "Instance '%s' has been deleted. " 

4167 "Use the make_transient() " 

4168 "function to send this object back " 

4169 "to the transient state." % state_str(state) 

4170 ) 

4171 

4172 obj = state.obj() 

4173 

4174 # check for late gc 

4175 if obj is None: 

4176 return 

4177 

4178 to_attach = self._before_attach(state, obj) 

4179 

4180 self._deleted.pop(state, None) 

4181 if revert_deletion: 

4182 self.identity_map.replace(state) 

4183 else: 

4184 self.identity_map.add(state) 

4185 

4186 if to_attach: 

4187 self._after_attach(state, obj) 

4188 elif revert_deletion: 

4189 self.dispatch.deleted_to_persistent(self, state) 

4190 

4191 def _save_or_update_impl(self, state: InstanceState[Any]) -> None: 

4192 if state.key is None: 

4193 self._save_impl(state) 

4194 else: 

4195 self._update_impl(state) 

4196 

4197 def enable_relationship_loading(self, obj: object) -> None: 

4198 """Associate an object with this :class:`.Session` for related 

4199 object loading. 

4200 

4201 .. warning:: 

4202 

4203 :meth:`.enable_relationship_loading` exists to serve special 

4204 use cases and is not recommended for general use. 

4205 

4206 Accesses of attributes mapped with :func:`_orm.relationship` 

4207 will attempt to load a value from the database using this 

4208 :class:`.Session` as the source of connectivity. The values 

4209 will be loaded based on foreign key and primary key values 

4210 present on this object - if not present, then those relationships 

4211 will be unavailable. 

4212 

4213 The object will be attached to this session, but will 

4214 **not** participate in any persistence operations; its state 

4215 for almost all purposes will remain either "transient" or 

4216 "detached", except for the case of relationship loading. 

4217 

4218 Also note that backrefs will often not work as expected. 

4219 Altering a relationship-bound attribute on the target object 

4220 may not fire off a backref event, if the effective value 

4221 is what was already loaded from a foreign-key-holding value. 

4222 

4223 The :meth:`.Session.enable_relationship_loading` method is 

4224 similar to the ``load_on_pending`` flag on :func:`_orm.relationship`. 

4225 Unlike that flag, :meth:`.Session.enable_relationship_loading` allows 

4226 an object to remain transient while still being able to load 

4227 related items. 

4228 

4229 To make a transient object associated with a :class:`.Session` 

4230 via :meth:`.Session.enable_relationship_loading` pending, add 

4231 it to the :class:`.Session` using :meth:`.Session.add` normally. 

4232 If the object instead represents an existing identity in the database, 

4233 it should be merged using :meth:`.Session.merge`. 

4234 

4235 :meth:`.Session.enable_relationship_loading` does not improve 

4236 behavior when the ORM is used normally - object references should be 

4237 constructed at the object level, not at the foreign key level, so 

4238 that they are present in an ordinary way before flush() 

4239 proceeds. This method is not intended for general use. 

4240 

4241 .. seealso:: 

4242 

4243 :paramref:`_orm.relationship.load_on_pending` - this flag 

4244 allows per-relationship loading of many-to-ones on items that 

4245 are pending. 

4246 

4247 :func:`.make_transient_to_detached` - allows for an object to 

4248 be added to a :class:`.Session` without SQL emitted, which then 

4249 will unexpire attributes on access. 

4250 

4251 """ 

4252 try: 

4253 state = attributes.instance_state(obj) 

4254 except exc.NO_STATE as err: 

4255 raise exc.UnmappedInstanceError(obj) from err 

4256 

4257 to_attach = self._before_attach(state, obj) 

4258 state._load_pending = True 

4259 if to_attach: 

4260 self._after_attach(state, obj) 

4261 

4262 def _before_attach(self, state: InstanceState[Any], obj: object) -> bool: 

4263 self._autobegin_t() 

4264 

4265 if state.session_id == self.hash_key: 

4266 return False 

4267 

4268 if state.session_id and state.session_id in _sessions: 

4269 raise sa_exc.InvalidRequestError( 

4270 "Object '%s' is already attached to session '%s' " 

4271 "(this is '%s')" 

4272 % (state_str(state), state.session_id, self.hash_key) 

4273 ) 

4274 

4275 self.dispatch.before_attach(self, state) 

4276 

4277 return True 

4278 

4279 def _after_attach(self, state: InstanceState[Any], obj: object) -> None: 

4280 state.session_id = self.hash_key 

4281 if state.modified and state._strong_obj is None: 

4282 state._strong_obj = obj 

4283 self.dispatch.after_attach(self, state) 

4284 

4285 if state.key: 

4286 self.dispatch.detached_to_persistent(self, state) 

4287 else: 

4288 self.dispatch.transient_to_pending(self, state) 

4289 

4290 def __contains__(self, instance: object) -> bool: 

4291 """Return True if the instance is associated with this session. 

4292 

4293 The instance may be pending or persistent within the Session for a 

4294 result of True. 

4295 

4296 """ 

4297 try: 

4298 state = attributes.instance_state(instance) 

4299 except exc.NO_STATE as err: 

4300 raise exc.UnmappedInstanceError(instance) from err 

4301 return self._contains_state(state) 

4302 

4303 def __iter__(self) -> Iterator[object]: 

4304 """Iterate over all pending or persistent instances within this 

4305 Session. 

4306 

4307 """ 

4308 return iter( 

4309 list(self._new.values()) + list(self.identity_map.values()) 

4310 ) 

4311 

4312 def _contains_state(self, state: InstanceState[Any]) -> bool: 

4313 return state in self._new or self.identity_map.contains_state(state) 

4314 

4315 def flush(self, objects: Optional[Sequence[Any]] = None) -> None: 

4316 """Flush all the object changes to the database. 

4317 

4318 Writes out all pending object creations, deletions and modifications 

4319 to the database as INSERTs, DELETEs, UPDATEs, etc. Operations are 

4320 automatically ordered by the Session's unit of work dependency 

4321 solver. 

4322 

4323 Database operations will be issued in the current transactional 

4324 context and do not affect the state of the transaction, unless an 

4325 error occurs, in which case the entire transaction is rolled back. 

4326 You may flush() as often as you like within a transaction to move 

4327 changes from Python to the database's transaction buffer. 

4328 

4329 :param objects: Optional; restricts the flush operation to operate 

4330 only on elements that are in the given collection. 

4331 

4332 This feature is for an extremely narrow set of use cases where 

4333 particular objects may need to be operated upon before the 

4334 full flush() occurs. It is not intended for general use. 

4335 

4336 """ 

4337 

4338 if self._flushing: 

4339 raise sa_exc.InvalidRequestError("Session is already flushing") 

4340 

4341 if self._is_clean(): 

4342 return 

4343 try: 

4344 self._flushing = True 

4345 self._flush(objects) 

4346 finally: 

4347 self._flushing = False 

4348 

4349 def _flush_warning(self, method: Any) -> None: 

4350 util.warn( 

4351 "Usage of the '%s' operation is not currently supported " 

4352 "within the execution stage of the flush process. " 

4353 "Results may not be consistent. Consider using alternative " 

4354 "event listeners or connection-level operations instead." % method 

4355 ) 

4356 

4357 def _is_clean(self) -> bool: 

4358 return ( 

4359 not self.identity_map.check_modified() 

4360 and not self._deleted 

4361 and not self._new 

4362 ) 

4363 

4364 def _flush(self, objects: Optional[Sequence[object]] = None) -> None: 

4365 dirty = self._dirty_states 

4366 if not dirty and not self._deleted and not self._new: 

4367 self.identity_map._modified.clear() 

4368 return 

4369 

4370 flush_context = UOWTransaction(self) 

4371 

4372 if self.dispatch.before_flush: 

4373 self.dispatch.before_flush(self, flush_context, objects) 

4374 # re-establish "dirty states" in case the listeners 

4375 # added 

4376 dirty = self._dirty_states 

4377 

4378 deleted = set(self._deleted) 

4379 new = set(self._new) 

4380 

4381 dirty = set(dirty).difference(deleted) 

4382 

4383 # create the set of all objects we want to operate upon 

4384 if objects: 

4385 # specific list passed in 

4386 objset = set() 

4387 for o in objects: 

4388 try: 

4389 state = attributes.instance_state(o) 

4390 

4391 except exc.NO_STATE as err: 

4392 raise exc.UnmappedInstanceError(o) from err 

4393 objset.add(state) 

4394 else: 

4395 objset = None 

4396 

4397 # store objects whose fate has been decided 

4398 processed = set() 

4399 

4400 # put all saves/updates into the flush context. detect top-level 

4401 # orphans and throw them into deleted. 

4402 if objset: 

4403 proc = new.union(dirty).intersection(objset).difference(deleted) 

4404 else: 

4405 proc = new.union(dirty).difference(deleted) 

4406 

4407 for state in proc: 

4408 is_orphan = _state_mapper(state)._is_orphan(state) 

4409 

4410 is_persistent_orphan = is_orphan and state.has_identity 

4411 

4412 if ( 

4413 is_orphan 

4414 and not is_persistent_orphan 

4415 and state._orphaned_outside_of_session 

4416 ): 

4417 self._expunge_states([state]) 

4418 else: 

4419 _reg = flush_context.register_object( 

4420 state, isdelete=is_persistent_orphan 

4421 ) 

4422 assert _reg, "Failed to add object to the flush context!" 

4423 processed.add(state) 

4424 

4425 # put all remaining deletes into the flush context. 

4426 if objset: 

4427 proc = deleted.intersection(objset).difference(processed) 

4428 else: 

4429 proc = deleted.difference(processed) 

4430 for state in proc: 

4431 _reg = flush_context.register_object(state, isdelete=True) 

4432 assert _reg, "Failed to add object to the flush context!" 

4433 

4434 if not flush_context.has_work: 

4435 return 

4436 

4437 flush_context.transaction = transaction = self._autobegin_t()._begin() 

4438 try: 

4439 self._warn_on_events = True 

4440 try: 

4441 flush_context.execute() 

4442 finally: 

4443 self._warn_on_events = False 

4444 

4445 self.dispatch.after_flush(self, flush_context) 

4446 

4447 flush_context.finalize_flush_changes() 

4448 

4449 if not objects and self.identity_map._modified: 

4450 len_ = len(self.identity_map._modified) 

4451 

4452 statelib.InstanceState._commit_all_states( 

4453 [ 

4454 (state, state.dict) 

4455 for state in self.identity_map._modified 

4456 ], 

4457 instance_dict=self.identity_map, 

4458 ) 

4459 util.warn( 

4460 "Attribute history events accumulated on %d " 

4461 "previously clean instances " 

4462 "within inner-flush event handlers have been " 

4463 "reset, and will not result in database updates. " 

4464 "Consider using set_committed_value() within " 

4465 "inner-flush event handlers to avoid this warning." % len_ 

4466 ) 

4467 

4468 # useful assertions: 

4469 # if not objects: 

4470 # assert not self.identity_map._modified 

4471 # else: 

4472 # assert self.identity_map._modified == \ 

4473 # self.identity_map._modified.difference(objects) 

4474 

4475 self.dispatch.after_flush_postexec(self, flush_context) 

4476 

4477 transaction.commit() 

4478 

4479 except: 

4480 with util.safe_reraise(): 

4481 transaction.rollback(_capture_exception=True) 

4482 

4483 def bulk_save_objects( 

4484 self, 

4485 objects: Iterable[object], 

4486 return_defaults: bool = False, 

4487 update_changed_only: bool = True, 

4488 preserve_order: bool = True, 

4489 ) -> None: 

4490 """Perform a bulk save of the given list of objects. 

4491 

4492 .. legacy:: 

4493 

4494 This method is a legacy feature as of the 2.0 series of 

4495 SQLAlchemy. For modern bulk INSERT and UPDATE, see 

4496 the sections :ref:`orm_queryguide_bulk_insert` and 

4497 :ref:`orm_queryguide_bulk_update`. 

4498 

4499 For general INSERT and UPDATE of existing ORM mapped objects, 

4500 prefer standard :term:`unit of work` data management patterns, 

4501 introduced in the :ref:`unified_tutorial` at 

4502 :ref:`tutorial_orm_data_manipulation`. SQLAlchemy 2.0 

4503 now uses :ref:`engine_insertmanyvalues` with modern dialects 

4504 which solves previous issues of bulk INSERT slowness. 

4505 

4506 :param objects: a sequence of mapped object instances. The mapped 

4507 objects are persisted as is, and are **not** associated with the 

4508 :class:`.Session` afterwards. 

4509 

4510 For each object, whether the object is sent as an INSERT or an 

4511 UPDATE is dependent on the same rules used by the :class:`.Session` 

4512 in traditional operation; if the object has the 

4513 :attr:`.InstanceState.key` 

4514 attribute set, then the object is assumed to be "detached" and 

4515 will result in an UPDATE. Otherwise, an INSERT is used. 

4516 

4517 In the case of an UPDATE, statements are grouped based on which 

4518 attributes have changed, and are thus to be the subject of each 

4519 SET clause. If ``update_changed_only`` is False, then all 

4520 attributes present within each object are applied to the UPDATE 

4521 statement, which may help in allowing the statements to be grouped 

4522 together into a larger executemany(), and will also reduce the 

4523 overhead of checking history on attributes. 

4524 

4525 :param return_defaults: when True, rows that are missing values which 

4526 generate defaults, namely integer primary key defaults and sequences, 

4527 will be inserted **one at a time**, so that the primary key value 

4528 is available. In particular this will allow joined-inheritance 

4529 and other multi-table mappings to insert correctly without the need 

4530 to provide primary key values ahead of time; however, 

4531 :paramref:`.Session.bulk_save_objects.return_defaults` **greatly 

4532 reduces the performance gains** of the method overall. It is strongly 

4533 advised to please use the standard :meth:`_orm.Session.add_all` 

4534 approach. 

4535 

4536 :param update_changed_only: when True, UPDATE statements are rendered 

4537 based on those attributes in each state that have logged changes. 

4538 When False, all attributes present are rendered into the SET clause 

4539 with the exception of primary key attributes. 

4540 

4541 :param preserve_order: when True, the order of inserts and updates 

4542 matches exactly the order in which the objects are given. When 

4543 False, common types of objects are grouped into inserts 

4544 and updates, to allow for more batching opportunities. 

4545 

4546 .. seealso:: 

4547 

4548 :doc:`queryguide/dml` 

4549 

4550 :meth:`.Session.bulk_insert_mappings` 

4551 

4552 :meth:`.Session.bulk_update_mappings` 

4553 

4554 """ 

4555 

4556 obj_states: Iterable[InstanceState[Any]] 

4557 

4558 obj_states = (attributes.instance_state(obj) for obj in objects) 

4559 

4560 if not preserve_order: 

4561 # the purpose of this sort is just so that common mappers 

4562 # and persistence states are grouped together, so that groupby 

4563 # will return a single group for a particular type of mapper. 

4564 # it's not trying to be deterministic beyond that. 

4565 obj_states = sorted( 

4566 obj_states, 

4567 key=lambda state: (id(state.mapper), state.key is not None), 

4568 ) 

4569 

4570 def grouping_key( 

4571 state: InstanceState[_O], 

4572 ) -> Tuple[Mapper[_O], bool]: 

4573 return (state.mapper, state.key is not None) 

4574 

4575 for (mapper, isupdate), states in itertools.groupby( 

4576 obj_states, grouping_key 

4577 ): 

4578 self._bulk_save_mappings( 

4579 mapper, 

4580 states, 

4581 isupdate=isupdate, 

4582 isstates=True, 

4583 return_defaults=return_defaults, 

4584 update_changed_only=update_changed_only, 

4585 render_nulls=False, 

4586 ) 

4587 

4588 def bulk_insert_mappings( 

4589 self, 

4590 mapper: Mapper[Any], 

4591 mappings: Iterable[Dict[str, Any]], 

4592 return_defaults: bool = False, 

4593 render_nulls: bool = False, 

4594 ) -> None: 

4595 """Perform a bulk insert of the given list of mapping dictionaries. 

4596 

4597 .. legacy:: 

4598 

4599 This method is a legacy feature as of the 2.0 series of 

4600 SQLAlchemy. For modern bulk INSERT and UPDATE, see 

4601 the sections :ref:`orm_queryguide_bulk_insert` and 

4602 :ref:`orm_queryguide_bulk_update`. The 2.0 API shares 

4603 implementation details with this method and adds new features 

4604 as well. 

4605 

4606 :param mapper: a mapped class, or the actual :class:`_orm.Mapper` 

4607 object, 

4608 representing the single kind of object represented within the mapping 

4609 list. 

4610 

4611 :param mappings: a sequence of dictionaries, each one containing the 

4612 state of the mapped row to be inserted, in terms of the attribute 

4613 names on the mapped class. If the mapping refers to multiple tables, 

4614 such as a joined-inheritance mapping, each dictionary must contain all 

4615 keys to be populated into all tables. 

4616 

4617 :param return_defaults: when True, the INSERT process will be altered 

4618 to ensure that newly generated primary key values will be fetched. 

4619 The rationale for this parameter is typically to enable 

4620 :ref:`Joined Table Inheritance <joined_inheritance>` mappings to 

4621 be bulk inserted. 

4622 

4623 .. note:: for backends that don't support RETURNING, the 

4624 :paramref:`_orm.Session.bulk_insert_mappings.return_defaults` 

4625 parameter can significantly decrease performance as INSERT 

4626 statements can no longer be batched. See 

4627 :ref:`engine_insertmanyvalues` 

4628 for background on which backends are affected. 

4629 

4630 :param render_nulls: When True, a value of ``None`` will result 

4631 in a NULL value being included in the INSERT statement, rather 

4632 than the column being omitted from the INSERT. This allows all 

4633 the rows being INSERTed to have the identical set of columns which 

4634 allows the full set of rows to be batched to the DBAPI. Normally, 

4635 each column-set that contains a different combination of NULL values 

4636 than the previous row must omit a different series of columns from 

4637 the rendered INSERT statement, which means it must be emitted as a 

4638 separate statement. By passing this flag, the full set of rows 

4639 are guaranteed to be batchable into one batch; the cost however is 

4640 that server-side defaults which are invoked by an omitted column will 

4641 be skipped, so care must be taken to ensure that these are not 

4642 necessary. 

4643 

4644 .. warning:: 

4645 

4646 When this flag is set, **server side default SQL values will 

4647 not be invoked** for those columns that are inserted as NULL; 

4648 the NULL value will be sent explicitly. Care must be taken 

4649 to ensure that no server-side default functions need to be 

4650 invoked for the operation as a whole. 

4651 

4652 .. seealso:: 

4653 

4654 :doc:`queryguide/dml` 

4655 

4656 :meth:`.Session.bulk_save_objects` 

4657 

4658 :meth:`.Session.bulk_update_mappings` 

4659 

4660 """ 

4661 self._bulk_save_mappings( 

4662 mapper, 

4663 mappings, 

4664 isupdate=False, 

4665 isstates=False, 

4666 return_defaults=return_defaults, 

4667 update_changed_only=False, 

4668 render_nulls=render_nulls, 

4669 ) 

4670 

4671 def bulk_update_mappings( 

4672 self, mapper: Mapper[Any], mappings: Iterable[Dict[str, Any]] 

4673 ) -> None: 

4674 """Perform a bulk update of the given list of mapping dictionaries. 

4675 

4676 .. legacy:: 

4677 

4678 This method is a legacy feature as of the 2.0 series of 

4679 SQLAlchemy. For modern bulk INSERT and UPDATE, see 

4680 the sections :ref:`orm_queryguide_bulk_insert` and 

4681 :ref:`orm_queryguide_bulk_update`. The 2.0 API shares 

4682 implementation details with this method and adds new features 

4683 as well. 

4684 

4685 :param mapper: a mapped class, or the actual :class:`_orm.Mapper` 

4686 object, 

4687 representing the single kind of object represented within the mapping 

4688 list. 

4689 

4690 :param mappings: a sequence of dictionaries, each one containing the 

4691 state of the mapped row to be updated, in terms of the attribute names 

4692 on the mapped class. If the mapping refers to multiple tables, such 

4693 as a joined-inheritance mapping, each dictionary may contain keys 

4694 corresponding to all tables. All those keys which are present and 

4695 are not part of the primary key are applied to the SET clause of the 

4696 UPDATE statement; the primary key values, which are required, are 

4697 applied to the WHERE clause. 

4698 

4699 

4700 .. seealso:: 

4701 

4702 :doc:`queryguide/dml` 

4703 

4704 :meth:`.Session.bulk_insert_mappings` 

4705 

4706 :meth:`.Session.bulk_save_objects` 

4707 

4708 """ 

4709 self._bulk_save_mappings( 

4710 mapper, 

4711 mappings, 

4712 isupdate=True, 

4713 isstates=False, 

4714 return_defaults=False, 

4715 update_changed_only=False, 

4716 render_nulls=False, 

4717 ) 

4718 

4719 def _bulk_save_mappings( 

4720 self, 

4721 mapper: Mapper[_O], 

4722 mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 

4723 *, 

4724 isupdate: bool, 

4725 isstates: bool, 

4726 return_defaults: bool, 

4727 update_changed_only: bool, 

4728 render_nulls: bool, 

4729 ) -> None: 

4730 mapper = _class_to_mapper(mapper) 

4731 self._flushing = True 

4732 

4733 transaction = self._autobegin_t()._begin() 

4734 try: 

4735 if isupdate: 

4736 bulk_persistence._bulk_update( 

4737 mapper, 

4738 mappings, 

4739 transaction, 

4740 isstates=isstates, 

4741 update_changed_only=update_changed_only, 

4742 ) 

4743 else: 

4744 bulk_persistence._bulk_insert( 

4745 mapper, 

4746 mappings, 

4747 transaction, 

4748 isstates=isstates, 

4749 return_defaults=return_defaults, 

4750 render_nulls=render_nulls, 

4751 ) 

4752 transaction.commit() 

4753 

4754 except: 

4755 with util.safe_reraise(): 

4756 transaction.rollback(_capture_exception=True) 

4757 finally: 

4758 self._flushing = False 

4759 

4760 def is_modified( 

4761 self, instance: object, include_collections: bool = True 

4762 ) -> bool: 

4763 r"""Return ``True`` if the given instance has locally 

4764 modified attributes. 

4765 

4766 This method retrieves the history for each instrumented 

4767 attribute on the instance and performs a comparison of the current 

4768 value to its previously flushed or committed value, if any. 

4769 

4770 It is in effect a more expensive and accurate 

4771 version of checking for the given instance in the 

4772 :attr:`.Session.dirty` collection; a full test for 

4773 each attribute's net "dirty" status is performed. 

4774 

4775 E.g.:: 

4776 

4777 return session.is_modified(someobject) 

4778 

4779 A few caveats to this method apply: 

4780 

4781 * Instances present in the :attr:`.Session.dirty` collection may 

4782 report ``False`` when tested with this method. This is because 

4783 the object may have received change events via attribute mutation, 

4784 thus placing it in :attr:`.Session.dirty`, but ultimately the state 

4785 is the same as that loaded from the database, resulting in no net 

4786 change here. 

4787 * Scalar attributes may not have recorded the previously set 

4788 value when a new value was applied, if the attribute was not loaded, 

4789 or was expired, at the time the new value was received - in these 

4790 cases, the attribute is assumed to have a change, even if there is 

4791 ultimately no net change against its database value. SQLAlchemy in 

4792 most cases does not need the "old" value when a set event occurs, so 

4793 it skips the expense of a SQL call if the old value isn't present, 

4794 based on the assumption that an UPDATE of the scalar value is 

4795 usually needed, and in those few cases where it isn't, is less 

4796 expensive on average than issuing a defensive SELECT. 

4797 

4798 The "old" value is fetched unconditionally upon set only if the 

4799 attribute container has the ``active_history`` flag set to ``True``. 

4800 This flag is set typically for primary key attributes and scalar 

4801 object references that are not a simple many-to-one. To set this 

4802 flag for any arbitrary mapped column, use the ``active_history`` 

4803 argument with :func:`.column_property`. 

4804 

4805 :param instance: mapped instance to be tested for pending changes. 

4806 :param include_collections: Indicates if multivalued collections 

4807 should be included in the operation. Setting this to ``False`` is a 

4808 way to detect only local-column based properties (i.e. scalar columns 

4809 or many-to-one foreign keys) that would result in an UPDATE for this 

4810 instance upon flush. 

4811 

4812 """ 

4813 state = object_state(instance) 

4814 

4815 if not state.modified: 

4816 return False 

4817 

4818 dict_ = state.dict 

4819 

4820 for attr in state.manager.attributes: 

4821 if ( 

4822 not include_collections 

4823 and hasattr(attr.impl, "get_collection") 

4824 ) or not hasattr(attr.impl, "get_history"): 

4825 continue 

4826 

4827 (added, unchanged, deleted) = attr.impl.get_history( 

4828 state, dict_, passive=PassiveFlag.NO_CHANGE 

4829 ) 

4830 

4831 if added or deleted: 

4832 return True 

4833 else: 

4834 return False 

4835 

4836 @property 

4837 def is_active(self) -> bool: 

4838 """True if this :class:`.Session` not in "partial rollback" state. 

4839 

4840 .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins 

4841 a new transaction immediately, so this attribute will be False 

4842 when the :class:`_orm.Session` is first instantiated. 

4843 

4844 "partial rollback" state typically indicates that the flush process 

4845 of the :class:`_orm.Session` has failed, and that the 

4846 :meth:`_orm.Session.rollback` method must be emitted in order to 

4847 fully roll back the transaction. 

4848 

4849 If this :class:`_orm.Session` is not in a transaction at all, the 

4850 :class:`_orm.Session` will autobegin when it is first used, so in this 

4851 case :attr:`_orm.Session.is_active` will return True. 

4852 

4853 Otherwise, if this :class:`_orm.Session` is within a transaction, 

4854 and that transaction has not been rolled back internally, the 

4855 :attr:`_orm.Session.is_active` will also return True. 

4856 

4857 .. seealso:: 

4858 

4859 :ref:`faq_session_rollback` 

4860 

4861 :meth:`_orm.Session.in_transaction` 

4862 

4863 """ 

4864 return self._transaction is None or self._transaction.is_active 

4865 

4866 @property 

4867 def _dirty_states(self) -> Iterable[InstanceState[Any]]: 

4868 """The set of all persistent states considered dirty. 

4869 

4870 This method returns all states that were modified including 

4871 those that were possibly deleted. 

4872 

4873 """ 

4874 return self.identity_map._dirty_states() 

4875 

4876 @property 

4877 def dirty(self) -> IdentitySet: 

4878 """The set of all persistent instances considered dirty. 

4879 

4880 E.g.:: 

4881 

4882 some_mapped_object in session.dirty 

4883 

4884 Instances are considered dirty when they were modified but not 

4885 deleted. 

4886 

4887 Note that this 'dirty' calculation is 'optimistic'; most 

4888 attribute-setting or collection modification operations will 

4889 mark an instance as 'dirty' and place it in this set, even if 

4890 there is no net change to the attribute's value. At flush 

4891 time, the value of each attribute is compared to its 

4892 previously saved value, and if there's no net change, no SQL 

4893 operation will occur (this is a more expensive operation so 

4894 it's only done at flush time). 

4895 

4896 To check if an instance has actionable net changes to its 

4897 attributes, use the :meth:`.Session.is_modified` method. 

4898 

4899 """ 

4900 return IdentitySet( 

4901 [ 

4902 state.obj() 

4903 for state in self._dirty_states 

4904 if state not in self._deleted 

4905 ] 

4906 ) 

4907 

4908 @property 

4909 def deleted(self) -> IdentitySet: 

4910 "The set of all instances marked as 'deleted' within this ``Session``" 

4911 

4912 return util.IdentitySet(list(self._deleted.values())) 

4913 

4914 @property 

4915 def new(self) -> IdentitySet: 

4916 "The set of all instances marked as 'new' within this ``Session``." 

4917 

4918 return util.IdentitySet(list(self._new.values())) 

4919 

4920 

4921_S = TypeVar("_S", bound="Session") 

4922 

4923 

4924class sessionmaker(_SessionClassMethods, Generic[_S]): 

4925 """A configurable :class:`.Session` factory. 

4926 

4927 The :class:`.sessionmaker` factory generates new 

4928 :class:`.Session` objects when called, creating them given 

4929 the configurational arguments established here. 

4930 

4931 e.g.:: 

4932 

4933 from sqlalchemy import create_engine 

4934 from sqlalchemy.orm import sessionmaker 

4935 

4936 # an Engine, which the Session will use for connection 

4937 # resources 

4938 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/") 

4939 

4940 Session = sessionmaker(engine) 

4941 

4942 with Session() as session: 

4943 session.add(some_object) 

4944 session.add(some_other_object) 

4945 session.commit() 

4946 

4947 Context manager use is optional; otherwise, the returned 

4948 :class:`_orm.Session` object may be closed explicitly via the 

4949 :meth:`_orm.Session.close` method. Using a 

4950 ``try:/finally:`` block is optional, however will ensure that the close 

4951 takes place even if there are database errors:: 

4952 

4953 session = Session() 

4954 try: 

4955 session.add(some_object) 

4956 session.add(some_other_object) 

4957 session.commit() 

4958 finally: 

4959 session.close() 

4960 

4961 :class:`.sessionmaker` acts as a factory for :class:`_orm.Session` 

4962 objects in the same way as an :class:`_engine.Engine` acts as a factory 

4963 for :class:`_engine.Connection` objects. In this way it also includes 

4964 a :meth:`_orm.sessionmaker.begin` method, that provides a context 

4965 manager which both begins and commits a transaction, as well as closes 

4966 out the :class:`_orm.Session` when complete, rolling back the transaction 

4967 if any errors occur:: 

4968 

4969 Session = sessionmaker(engine) 

4970 

4971 with Session.begin() as session: 

4972 session.add(some_object) 

4973 session.add(some_other_object) 

4974 # commits transaction, closes session 

4975 

4976 .. versionadded:: 1.4 

4977 

4978 When calling upon :class:`_orm.sessionmaker` to construct a 

4979 :class:`_orm.Session`, keyword arguments may also be passed to the 

4980 method; these arguments will override that of the globally configured 

4981 parameters. Below we use a :class:`_orm.sessionmaker` bound to a certain 

4982 :class:`_engine.Engine` to produce a :class:`_orm.Session` that is instead 

4983 bound to a specific :class:`_engine.Connection` procured from that engine:: 

4984 

4985 Session = sessionmaker(engine) 

4986 

4987 # bind an individual session to a connection 

4988 

4989 with engine.connect() as connection: 

4990 with Session(bind=connection) as session: 

4991 ... # work with session 

4992 

4993 The class also includes a method :meth:`_orm.sessionmaker.configure`, which 

4994 can be used to specify additional keyword arguments to the factory, which 

4995 will take effect for subsequent :class:`.Session` objects generated. This 

4996 is usually used to associate one or more :class:`_engine.Engine` objects 

4997 with an existing 

4998 :class:`.sessionmaker` factory before it is first used:: 

4999 

5000 # application starts, sessionmaker does not have 

5001 # an engine bound yet 

5002 Session = sessionmaker() 

5003 

5004 # ... later, when an engine URL is read from a configuration 

5005 # file or other events allow the engine to be created 

5006 engine = create_engine("sqlite:///foo.db") 

5007 Session.configure(bind=engine) 

5008 

5009 sess = Session() 

5010 # work with session 

5011 

5012 .. seealso:: 

5013 

5014 :ref:`session_getting` - introductory text on creating 

5015 sessions using :class:`.sessionmaker`. 

5016 

5017 """ 

5018 

5019 class_: Type[_S] 

5020 

5021 @overload 

5022 def __init__( 

5023 self, 

5024 bind: Optional[_SessionBind] = ..., 

5025 *, 

5026 class_: Type[_S], 

5027 autoflush: bool = ..., 

5028 expire_on_commit: bool = ..., 

5029 info: Optional[_InfoType] = ..., 

5030 **kw: Any, 

5031 ): ... 

5032 

5033 @overload 

5034 def __init__( 

5035 self: "sessionmaker[Session]", 

5036 bind: Optional[_SessionBind] = ..., 

5037 *, 

5038 autoflush: bool = ..., 

5039 expire_on_commit: bool = ..., 

5040 info: Optional[_InfoType] = ..., 

5041 **kw: Any, 

5042 ): ... 

5043 

5044 def __init__( 

5045 self, 

5046 bind: Optional[_SessionBind] = None, 

5047 *, 

5048 class_: Type[_S] = Session, # type: ignore 

5049 autoflush: bool = True, 

5050 expire_on_commit: bool = True, 

5051 info: Optional[_InfoType] = None, 

5052 **kw: Any, 

5053 ): 

5054 r"""Construct a new :class:`.sessionmaker`. 

5055 

5056 All arguments here except for ``class_`` correspond to arguments 

5057 accepted by :class:`.Session` directly. See the 

5058 :meth:`.Session.__init__` docstring for more details on parameters. 

5059 

5060 :param bind: a :class:`_engine.Engine` or other :class:`.Connectable` 

5061 with 

5062 which newly created :class:`.Session` objects will be associated. 

5063 :param class\_: class to use in order to create new :class:`.Session` 

5064 objects. Defaults to :class:`.Session`. 

5065 :param autoflush: The autoflush setting to use with newly created 

5066 :class:`.Session` objects. 

5067 

5068 .. seealso:: 

5069 

5070 :ref:`session_flushing` - additional background on autoflush 

5071 

5072 :param expire_on_commit=True: the 

5073 :paramref:`_orm.Session.expire_on_commit` setting to use 

5074 with newly created :class:`.Session` objects. 

5075 

5076 :param info: optional dictionary of information that will be available 

5077 via :attr:`.Session.info`. Note this dictionary is *updated*, not 

5078 replaced, when the ``info`` parameter is specified to the specific 

5079 :class:`.Session` construction operation. 

5080 

5081 :param \**kw: all other keyword arguments are passed to the 

5082 constructor of newly created :class:`.Session` objects. 

5083 

5084 """ 

5085 kw["bind"] = bind 

5086 kw["autoflush"] = autoflush 

5087 kw["expire_on_commit"] = expire_on_commit 

5088 if info is not None: 

5089 kw["info"] = info 

5090 self.kw = kw 

5091 # make our own subclass of the given class, so that 

5092 # events can be associated with it specifically. 

5093 self.class_ = type(class_.__name__, (class_,), {}) 

5094 

5095 def begin(self) -> contextlib.AbstractContextManager[_S]: 

5096 """Produce a context manager that both provides a new 

5097 :class:`_orm.Session` as well as a transaction that commits. 

5098 

5099 

5100 e.g.:: 

5101 

5102 Session = sessionmaker(some_engine) 

5103 

5104 with Session.begin() as session: 

5105 session.add(some_object) 

5106 

5107 # commits transaction, closes session 

5108 

5109 .. versionadded:: 1.4 

5110 

5111 

5112 """ 

5113 

5114 session = self() 

5115 return session._maker_context_manager() 

5116 

5117 def __call__(self, **local_kw: Any) -> _S: 

5118 """Produce a new :class:`.Session` object using the configuration 

5119 established in this :class:`.sessionmaker`. 

5120 

5121 In Python, the ``__call__`` method is invoked on an object when 

5122 it is "called" in the same way as a function:: 

5123 

5124 Session = sessionmaker(some_engine) 

5125 session = Session() # invokes sessionmaker.__call__() 

5126 

5127 """ 

5128 for k, v in self.kw.items(): 

5129 if k == "info" and "info" in local_kw: 

5130 d = v.copy() 

5131 d.update(local_kw["info"]) 

5132 local_kw["info"] = d 

5133 else: 

5134 local_kw.setdefault(k, v) 

5135 return self.class_(**local_kw) 

5136 

5137 def configure(self, **new_kw: Any) -> None: 

5138 """(Re)configure the arguments for this sessionmaker. 

5139 

5140 e.g.:: 

5141 

5142 Session = sessionmaker() 

5143 

5144 Session.configure(bind=create_engine("sqlite://")) 

5145 """ 

5146 self.kw.update(new_kw) 

5147 

5148 def __repr__(self) -> str: 

5149 return "%s(class_=%r, %s)" % ( 

5150 self.__class__.__name__, 

5151 self.class_.__name__, 

5152 ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()), 

5153 ) 

5154 

5155 

5156def close_all_sessions() -> None: 

5157 """Close all sessions in memory. 

5158 

5159 This function consults a global registry of all :class:`.Session` objects 

5160 and calls :meth:`.Session.close` on them, which resets them to a clean 

5161 state. 

5162 

5163 This function is not for general use but may be useful for test suites 

5164 within the teardown scheme. 

5165 

5166 .. versionadded:: 1.3 

5167 

5168 """ 

5169 

5170 for sess in _sessions.values(): 

5171 sess.close() 

5172 

5173 

5174def make_transient(instance: object) -> None: 

5175 """Alter the state of the given instance so that it is :term:`transient`. 

5176 

5177 .. note:: 

5178 

5179 :func:`.make_transient` is a special-case function for 

5180 advanced use cases only. 

5181 

5182 The given mapped instance is assumed to be in the :term:`persistent` or 

5183 :term:`detached` state. The function will remove its association with any 

5184 :class:`.Session` as well as its :attr:`.InstanceState.identity`. The 

5185 effect is that the object will behave as though it were newly constructed, 

5186 except retaining any attribute / collection values that were loaded at the 

5187 time of the call. The :attr:`.InstanceState.deleted` flag is also reset 

5188 if this object had been deleted as a result of using 

5189 :meth:`.Session.delete`. 

5190 

5191 .. warning:: 

5192 

5193 :func:`.make_transient` does **not** "unexpire" or otherwise eagerly 

5194 load ORM-mapped attributes that are not currently loaded at the time 

5195 the function is called. This includes attributes which: 

5196 

5197 * were expired via :meth:`.Session.expire` 

5198 

5199 * were expired as the natural effect of committing a session 

5200 transaction, e.g. :meth:`.Session.commit` 

5201 

5202 * are normally :term:`lazy loaded` but are not currently loaded 

5203 

5204 * are "deferred" (see :ref:`orm_queryguide_column_deferral`) and are 

5205 not yet loaded 

5206 

5207 * were not present in the query which loaded this object, such as that 

5208 which is common in joined table inheritance and other scenarios. 

5209 

5210 After :func:`.make_transient` is called, unloaded attributes such 

5211 as those above will normally resolve to the value ``None`` when 

5212 accessed, or an empty collection for a collection-oriented attribute. 

5213 As the object is transient and un-associated with any database 

5214 identity, it will no longer retrieve these values. 

5215 

5216 .. seealso:: 

5217 

5218 :func:`.make_transient_to_detached` 

5219 

5220 """ 

5221 state = attributes.instance_state(instance) 

5222 s = _state_session(state) 

5223 if s: 

5224 s._expunge_states([state]) 

5225 

5226 # remove expired state 

5227 state.expired_attributes.clear() 

5228 

5229 # remove deferred callables 

5230 if state.callables: 

5231 del state.callables 

5232 

5233 if state.key: 

5234 del state.key 

5235 if state._deleted: 

5236 del state._deleted 

5237 

5238 

5239def make_transient_to_detached(instance: object) -> None: 

5240 """Make the given transient instance :term:`detached`. 

5241 

5242 .. note:: 

5243 

5244 :func:`.make_transient_to_detached` is a special-case function for 

5245 advanced use cases only. 

5246 

5247 All attribute history on the given instance 

5248 will be reset as though the instance were freshly loaded 

5249 from a query. Missing attributes will be marked as expired. 

5250 The primary key attributes of the object, which are required, will be made 

5251 into the "key" of the instance. 

5252 

5253 The object can then be added to a session, or merged 

5254 possibly with the load=False flag, at which point it will look 

5255 as if it were loaded that way, without emitting SQL. 

5256 

5257 This is a special use case function that differs from a normal 

5258 call to :meth:`.Session.merge` in that a given persistent state 

5259 can be manufactured without any SQL calls. 

5260 

5261 .. seealso:: 

5262 

5263 :func:`.make_transient` 

5264 

5265 :meth:`.Session.enable_relationship_loading` 

5266 

5267 """ 

5268 state = attributes.instance_state(instance) 

5269 if state.session_id or state.key: 

5270 raise sa_exc.InvalidRequestError("Given object must be transient") 

5271 state.key = state.mapper._identity_key_from_state(state) 

5272 if state._deleted: 

5273 del state._deleted 

5274 state._commit_all(state.dict) 

5275 state._expire_attributes(state.dict, state.unloaded) 

5276 

5277 

5278def object_session(instance: object) -> Optional[Session]: 

5279 """Return the :class:`.Session` to which the given instance belongs. 

5280 

5281 This is essentially the same as the :attr:`.InstanceState.session` 

5282 accessor. See that attribute for details. 

5283 

5284 """ 

5285 

5286 try: 

5287 state = attributes.instance_state(instance) 

5288 except exc.NO_STATE as err: 

5289 raise exc.UnmappedInstanceError(instance) from err 

5290 else: 

5291 return _state_session(state) 

5292 

5293 

5294_new_sessionid = util.counter()