Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

824 statements  

1# orm/util.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import get_origin 

24from typing import Iterable 

25from typing import Iterator 

26from typing import List 

27from typing import Literal 

28from typing import Match 

29from typing import Optional 

30from typing import Protocol 

31from typing import Sequence 

32from typing import Tuple 

33from typing import Type 

34from typing import TYPE_CHECKING 

35from typing import TypeVar 

36from typing import Union 

37import weakref 

38 

39from . import attributes # noqa 

40from . import exc 

41from . import exc as orm_exc 

42from ._typing import _O 

43from ._typing import insp_is_aliased_class 

44from ._typing import insp_is_mapper 

45from ._typing import prop_is_relationship 

46from .base import _class_to_mapper as _class_to_mapper 

47from .base import _MappedAnnotationBase 

48from .base import _never_set as _never_set # noqa: F401 

49from .base import _none_only_set as _none_only_set # noqa: F401 

50from .base import _none_set as _none_set # noqa: F401 

51from .base import attribute_str as attribute_str # noqa: F401 

52from .base import class_mapper as class_mapper 

53from .base import DynamicMapped 

54from .base import InspectionAttr as InspectionAttr 

55from .base import instance_str as instance_str # noqa: F401 

56from .base import Mapped 

57from .base import object_mapper as object_mapper 

58from .base import object_state as object_state # noqa: F401 

59from .base import opt_manager_of_class 

60from .base import ORMDescriptor 

61from .base import state_attribute_str as state_attribute_str # noqa: F401 

62from .base import state_class_str as state_class_str # noqa: F401 

63from .base import state_str as state_str # noqa: F401 

64from .base import WriteOnlyMapped 

65from .interfaces import CriteriaOption 

66from .interfaces import MapperProperty as MapperProperty 

67from .interfaces import ORMColumnsClauseRole 

68from .interfaces import ORMEntityColumnsClauseRole 

69from .interfaces import ORMFromClauseRole 

70from .path_registry import PathRegistry as PathRegistry 

71from .. import event 

72from .. import exc as sa_exc 

73from .. import inspection 

74from .. import sql 

75from .. import util 

76from ..engine.result import result_tuple 

77from ..sql import coercions 

78from ..sql import expression 

79from ..sql import lambdas 

80from ..sql import roles 

81from ..sql import util as sql_util 

82from ..sql import visitors 

83from ..sql._typing import is_selectable 

84from ..sql.annotation import SupportsCloneAnnotations 

85from ..sql.base import ColumnCollection 

86from ..sql.cache_key import HasCacheKey 

87from ..sql.cache_key import MemoizedHasCacheKey 

88from ..sql.elements import ColumnElement 

89from ..sql.elements import KeyedColumnElement 

90from ..sql.selectable import FromClause 

91from ..util.langhelpers import MemoizedSlots 

92from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

93from ..util.typing import eval_name_only as _eval_name_only 

94from ..util.typing import fixup_container_fwd_refs 

95from ..util.typing import GenericProtocol 

96from ..util.typing import is_origin_of_cls 

97from ..util.typing import TupleAny 

98from ..util.typing import Unpack 

99 

100if typing.TYPE_CHECKING: 

101 from ._typing import _EntityType 

102 from ._typing import _IdentityKeyType 

103 from ._typing import _InternalEntityType 

104 from ._typing import _ORMCOLEXPR 

105 from .context import _MapperEntity 

106 from .context import _ORMCompileState 

107 from .mapper import Mapper 

108 from .path_registry import _AbstractEntityRegistry 

109 from .query import Query 

110 from .relationships import RelationshipProperty 

111 from ..engine import Row 

112 from ..engine import RowMapping 

113 from ..sql._typing import _CE 

114 from ..sql._typing import _ColumnExpressionArgument 

115 from ..sql._typing import _EquivalentColumnMap 

116 from ..sql._typing import _FromClauseArgument 

117 from ..sql._typing import _OnClauseArgument 

118 from ..sql._typing import _PropagateAttrsType 

119 from ..sql.annotation import _SA 

120 from ..sql.base import ReadOnlyColumnCollection 

121 from ..sql.elements import BindParameter 

122 from ..sql.selectable import _ColumnsClauseElement 

123 from ..sql.selectable import Select 

124 from ..sql.selectable import Selectable 

125 from ..sql.visitors import anon_map 

126 from ..util.typing import _AnnotationScanType 

127 from ..util.typing import _MatchedOnType 

128 

129_T = TypeVar("_T", bound=Any) 

130 

131all_cascades = frozenset( 

132 ( 

133 "delete", 

134 "delete-orphan", 

135 "all", 

136 "merge", 

137 "expunge", 

138 "save-update", 

139 "refresh-expire", 

140 "none", 

141 ) 

142) 

143 

144_de_stringify_partial = functools.partial( 

145 functools.partial, 

146 locals_=util.immutabledict( 

147 { 

148 "Mapped": Mapped, 

149 "WriteOnlyMapped": WriteOnlyMapped, 

150 "DynamicMapped": DynamicMapped, 

151 } 

152 ), 

153) 

154 

155# partial is practically useless as we have to write out the whole 

156# function and maintain the signature anyway 

157 

158 

159class _DeStringifyAnnotation(Protocol): 

160 def __call__( 

161 self, 

162 cls: Type[Any], 

163 annotation: _AnnotationScanType, 

164 originating_module: str, 

165 *, 

166 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

167 include_generic: bool = False, 

168 ) -> _MatchedOnType: ... 

169 

170 

171de_stringify_annotation = cast( 

172 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

173) 

174 

175 

176class _EvalNameOnly(Protocol): 

177 def __call__(self, name: str, module_name: str) -> Any: ... 

178 

179 

180eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

181 

182 

183class CascadeOptions(FrozenSet[str]): 

184 """Keeps track of the options sent to 

185 :paramref:`.relationship.cascade`""" 

186 

187 _add_w_all_cascades = all_cascades.difference( 

188 ["all", "none", "delete-orphan"] 

189 ) 

190 _allowed_cascades = all_cascades 

191 

192 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

193 

194 __slots__ = ( 

195 "save_update", 

196 "delete", 

197 "refresh_expire", 

198 "merge", 

199 "expunge", 

200 "delete_orphan", 

201 ) 

202 

203 save_update: bool 

204 delete: bool 

205 refresh_expire: bool 

206 merge: bool 

207 expunge: bool 

208 delete_orphan: bool 

209 

210 def __new__( 

211 cls, value_list: Optional[Union[Iterable[str], str]] 

212 ) -> CascadeOptions: 

213 if isinstance(value_list, str) or value_list is None: 

214 return cls.from_string(value_list) # type: ignore 

215 values = set(value_list) 

216 if values.difference(cls._allowed_cascades): 

217 raise sa_exc.ArgumentError( 

218 "Invalid cascade option(s): %s" 

219 % ", ".join( 

220 [ 

221 repr(x) 

222 for x in sorted( 

223 values.difference(cls._allowed_cascades) 

224 ) 

225 ] 

226 ) 

227 ) 

228 

229 if "all" in values: 

230 values.update(cls._add_w_all_cascades) 

231 if "none" in values: 

232 values.clear() 

233 values.discard("all") 

234 

235 self = super().__new__(cls, values) 

236 self.save_update = "save-update" in values 

237 self.delete = "delete" in values 

238 self.refresh_expire = "refresh-expire" in values 

239 self.merge = "merge" in values 

240 self.expunge = "expunge" in values 

241 self.delete_orphan = "delete-orphan" in values 

242 

243 if self.delete_orphan and not self.delete: 

244 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

245 return self 

246 

247 def __repr__(self): 

248 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

249 

250 @classmethod 

251 def from_string(cls, arg): 

252 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

253 return cls(values) 

254 

255 

256def _validator_events(desc, key, validator, include_removes, include_backrefs): 

257 """Runs a validation method on an attribute value to be set or 

258 appended. 

259 """ 

260 

261 if not include_backrefs: 

262 

263 def detect_is_backref(state, initiator): 

264 impl = state.manager[key].impl 

265 return initiator.impl is not impl 

266 

267 if include_removes: 

268 

269 def append(state, value, initiator): 

270 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

271 include_backrefs or not detect_is_backref(state, initiator) 

272 ): 

273 return validator(state.obj(), key, value, False) 

274 else: 

275 return value 

276 

277 def bulk_set(state, values, initiator): 

278 if include_backrefs or not detect_is_backref(state, initiator): 

279 obj = state.obj() 

280 values[:] = [ 

281 validator(obj, key, value, False) for value in values 

282 ] 

283 

284 def set_(state, value, oldvalue, initiator): 

285 if include_backrefs or not detect_is_backref(state, initiator): 

286 return validator(state.obj(), key, value, False) 

287 else: 

288 return value 

289 

290 def remove(state, value, initiator): 

291 if include_backrefs or not detect_is_backref(state, initiator): 

292 validator(state.obj(), key, value, True) 

293 

294 else: 

295 

296 def append(state, value, initiator): 

297 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

298 include_backrefs or not detect_is_backref(state, initiator) 

299 ): 

300 return validator(state.obj(), key, value) 

301 else: 

302 return value 

303 

304 def bulk_set(state, values, initiator): 

305 if include_backrefs or not detect_is_backref(state, initiator): 

306 obj = state.obj() 

307 values[:] = [validator(obj, key, value) for value in values] 

308 

309 def set_(state, value, oldvalue, initiator): 

310 if include_backrefs or not detect_is_backref(state, initiator): 

311 return validator(state.obj(), key, value) 

312 else: 

313 return value 

314 

315 event.listen(desc, "append", append, raw=True, retval=True) 

316 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

317 event.listen(desc, "set", set_, raw=True, retval=True) 

318 if include_removes: 

319 event.listen(desc, "remove", remove, raw=True, retval=True) 

320 

321 

322def polymorphic_union( 

323 table_map, typecolname, aliasname="p_union", cast_nulls=True 

324): 

325 """Create a ``UNION`` statement used by a polymorphic mapper. 

326 

327 See :ref:`concrete_inheritance` for an example of how 

328 this is used. 

329 

330 :param table_map: mapping of polymorphic identities to 

331 :class:`_schema.Table` objects. 

332 :param typecolname: string name of a "discriminator" column, which will be 

333 derived from the query, producing the polymorphic identity for 

334 each row. If ``None``, no polymorphic discriminator is generated. 

335 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

336 construct generated. 

337 :param cast_nulls: if True, non-existent columns, which are represented 

338 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

339 that is problematic on some backends such as Oracle - in which case it 

340 can be set to False. 

341 

342 """ 

343 

344 colnames: util.OrderedSet[str] = util.OrderedSet() 

345 colnamemaps = {} 

346 types = {} 

347 for key in table_map: 

348 table = table_map[key] 

349 

350 table = coercions.expect(roles.FromClauseRole, table) 

351 table_map[key] = table 

352 

353 m = {} 

354 for c in table.c: 

355 if c.key == typecolname: 

356 raise sa_exc.InvalidRequestError( 

357 "Polymorphic union can't use '%s' as the discriminator " 

358 "column due to mapped column %r; please apply the " 

359 "'typecolname' " 

360 "argument; this is available on " 

361 "ConcreteBase as '_concrete_discriminator_name'" 

362 % (typecolname, c) 

363 ) 

364 colnames.add(c.key) 

365 m[c.key] = c 

366 types[c.key] = c.type 

367 colnamemaps[table] = m 

368 

369 def col(name, table): 

370 try: 

371 return colnamemaps[table][name] 

372 except KeyError: 

373 if cast_nulls: 

374 return sql.cast(sql.null(), types[name]).label(name) 

375 else: 

376 return sql.type_coerce(sql.null(), types[name]).label(name) 

377 

378 result = [] 

379 for type_, table in table_map.items(): 

380 if typecolname is not None: 

381 result.append( 

382 sql.select( 

383 *( 

384 [col(name, table) for name in colnames] 

385 + [ 

386 sql.literal_column( 

387 sql_util._quote_ddl_expr(type_) 

388 ).label(typecolname) 

389 ] 

390 ) 

391 ).select_from(table) 

392 ) 

393 else: 

394 result.append( 

395 sql.select( 

396 *[col(name, table) for name in colnames] 

397 ).select_from(table) 

398 ) 

399 return sql.union_all(*result).alias(aliasname) 

400 

401 

402def identity_key( 

403 class_: Optional[Type[_T]] = None, 

404 ident: Union[Any, Tuple[Any, ...]] = None, 

405 *, 

406 instance: Optional[_T] = None, 

407 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

408 identity_token: Optional[Any] = None, 

409) -> _IdentityKeyType[_T]: 

410 r"""Generate "identity key" tuples, as are used as keys in the 

411 :attr:`.Session.identity_map` dictionary. 

412 

413 This function has several call styles: 

414 

415 * ``identity_key(class, ident, identity_token=token)`` 

416 

417 This form receives a mapped class and a primary key scalar or 

418 tuple as an argument. 

419 

420 E.g.:: 

421 

422 >>> identity_key(MyClass, (1, 2)) 

423 (<class '__main__.MyClass'>, (1, 2), None) 

424 

425 :param class: mapped class (must be a positional argument) 

426 :param ident: primary key, may be a scalar or tuple argument. 

427 :param identity_token: optional identity token 

428 

429 * ``identity_key(instance=instance)`` 

430 

431 This form will produce the identity key for a given instance. The 

432 instance need not be persistent, only that its primary key attributes 

433 are populated (else the key will contain ``None`` for those missing 

434 values). 

435 

436 E.g.:: 

437 

438 >>> instance = MyClass(1, 2) 

439 >>> identity_key(instance=instance) 

440 (<class '__main__.MyClass'>, (1, 2), None) 

441 

442 In this form, the given instance is ultimately run though 

443 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

444 effect of performing a database check for the corresponding row 

445 if the object is expired. 

446 

447 :param instance: object instance (must be given as a keyword arg) 

448 

449 * ``identity_key(class, row=row, identity_token=token)`` 

450 

451 This form is similar to the class/tuple form, except is passed a 

452 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

453 

454 E.g.:: 

455 

456 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

457 >>> identity_key(MyClass, row=row) 

458 (<class '__main__.MyClass'>, (1, 2), None) 

459 

460 :param class: mapped class (must be a positional argument) 

461 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

462 (must be given as a keyword arg) 

463 :param identity_token: optional identity token 

464 

465 """ # noqa: E501 

466 if class_ is not None: 

467 mapper = class_mapper(class_) 

468 if row is None: 

469 if ident is None: 

470 raise sa_exc.ArgumentError("ident or row is required") 

471 return mapper.identity_key_from_primary_key( 

472 tuple(util.to_list(ident)), identity_token=identity_token 

473 ) 

474 else: 

475 return mapper.identity_key_from_row( 

476 row, identity_token=identity_token 

477 ) 

478 elif instance is not None: 

479 mapper = object_mapper(instance) 

480 return mapper.identity_key_from_instance(instance) 

481 else: 

482 raise sa_exc.ArgumentError("class or instance is required") 

483 

484 

485class _TraceAdaptRole(enum.Enum): 

486 """Enumeration of all the use cases for ORMAdapter. 

487 

488 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

489 used for in-place adaption of column expressions to be applied to a SELECT, 

490 replacing :class:`.Table` and other objects that are mapped to classes with 

491 aliases of those tables in the case of joined eager loading, or in the case 

492 of polymorphic loading as used with concrete mappings or other custom "with 

493 polymorphic" parameters, with whole user-defined subqueries. The 

494 enumerations provide an overview of all the use cases used by ORMAdapter, a 

495 layer of formality as to the introduction of new ORMAdapter use cases (of 

496 which none are anticipated), as well as a means to trace the origins of a 

497 particular ORMAdapter within runtime debugging. 

498 

499 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

500 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

501 method and the ``Query.select_from_entity()`` methods, favoring 

502 user-explicit aliasing schemes using the ``aliased()`` and 

503 ``with_polymorphic()`` standalone constructs; these still use adaption, 

504 however the adaption is applied in a narrower scope. 

505 

506 """ 

507 

508 # aliased() use that is used to adapt individual attributes at query 

509 # construction time 

510 ALIASED_INSP = enum.auto() 

511 

512 # joinedload cases; typically adapt an ON clause of a relationship 

513 # join 

514 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

515 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

516 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

517 

518 # polymorphic cases - these are complex ones that replace FROM 

519 # clauses, replacing tables with subqueries 

520 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

521 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

522 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

523 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

524 

525 # the from_statement() case, used only to adapt individual attributes 

526 # from a given statement to local ORM attributes at result fetching 

527 # time. assigned to ORMCompileState._from_obj_alias 

528 ADAPT_FROM_STATEMENT = enum.auto() 

529 

530 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

531 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

532 # joinedloads are then placed on the outside. 

533 # assigned to ORMCompileState.compound_eager_adapter 

534 COMPOUND_EAGER_STATEMENT = enum.auto() 

535 

536 # the legacy Query._set_select_from() case. 

537 # this is needed for Query's set operations (i.e. UNION, etc. ) 

538 # as well as "legacy from_self()", which while removed from 2.0 as 

539 # public API, is used for the Query.count() method. this one 

540 # still does full statement traversal 

541 # assigned to ORMCompileState._from_obj_alias 

542 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

543 

544 

545class ORMStatementAdapter(sql_util.ColumnAdapter): 

546 """ColumnAdapter which includes a role attribute.""" 

547 

548 __slots__ = ("role",) 

549 

550 def __init__( 

551 self, 

552 role: _TraceAdaptRole, 

553 selectable: Selectable, 

554 *, 

555 equivalents: Optional[_EquivalentColumnMap] = None, 

556 adapt_required: bool = False, 

557 allow_label_resolve: bool = True, 

558 anonymize_labels: bool = False, 

559 adapt_on_names: bool = False, 

560 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

561 ): 

562 self.role = role 

563 super().__init__( 

564 selectable, 

565 equivalents=equivalents, 

566 adapt_required=adapt_required, 

567 allow_label_resolve=allow_label_resolve, 

568 anonymize_labels=anonymize_labels, 

569 adapt_on_names=adapt_on_names, 

570 adapt_from_selectables=adapt_from_selectables, 

571 ) 

572 

573 

574class ORMAdapter(sql_util.ColumnAdapter): 

575 """ColumnAdapter subclass which excludes adaptation of entities from 

576 non-matching mappers. 

577 

578 """ 

579 

580 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

581 

582 is_aliased_class: bool 

583 aliased_insp: Optional[AliasedInsp[Any]] 

584 

585 def __init__( 

586 self, 

587 role: _TraceAdaptRole, 

588 entity: _InternalEntityType[Any], 

589 *, 

590 equivalents: Optional[_EquivalentColumnMap] = None, 

591 adapt_required: bool = False, 

592 allow_label_resolve: bool = True, 

593 anonymize_labels: bool = False, 

594 selectable: Optional[Selectable] = None, 

595 limit_on_entity: bool = True, 

596 adapt_on_names: bool = False, 

597 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

598 ): 

599 self.role = role 

600 self.mapper = entity.mapper 

601 if selectable is None: 

602 selectable = entity.selectable 

603 if insp_is_aliased_class(entity): 

604 self.is_aliased_class = True 

605 self.aliased_insp = entity 

606 else: 

607 self.is_aliased_class = False 

608 self.aliased_insp = None 

609 

610 super().__init__( 

611 selectable, 

612 equivalents, 

613 adapt_required=adapt_required, 

614 allow_label_resolve=allow_label_resolve, 

615 anonymize_labels=anonymize_labels, 

616 include_fn=self._include_fn if limit_on_entity else None, 

617 adapt_on_names=adapt_on_names, 

618 adapt_from_selectables=adapt_from_selectables, 

619 ) 

620 

621 def _include_fn(self, elem): 

622 entity = elem._annotations.get("parentmapper", None) 

623 

624 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

625 

626 

627class AliasedClass( 

628 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

629): 

630 r"""Represents an "aliased" form of a mapped class for usage with Query. 

631 

632 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

633 construct, this object mimics the mapped class using a 

634 ``__getattr__`` scheme and maintains a reference to a 

635 real :class:`~sqlalchemy.sql.expression.Alias` object. 

636 

637 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

638 within a SQL statement generated by the ORM, such that an existing 

639 mapped entity can be used in multiple contexts. A simple example:: 

640 

641 # find all pairs of users with the same name 

642 user_alias = aliased(User) 

643 session.query(User, user_alias).join( 

644 (user_alias, User.id > user_alias.id) 

645 ).filter(User.name == user_alias.name) 

646 

647 :class:`.AliasedClass` is also capable of mapping an existing mapped 

648 class to an entirely new selectable, provided this selectable is column- 

649 compatible with the existing mapped selectable, and it can also be 

650 configured in a mapping as the target of a :func:`_orm.relationship`. 

651 See the links below for examples. 

652 

653 The :class:`.AliasedClass` object is constructed typically using the 

654 :func:`_orm.aliased` function. It also is produced with additional 

655 configuration when using the :func:`_orm.with_polymorphic` function. 

656 

657 The resulting object is an instance of :class:`.AliasedClass`. 

658 This object implements an attribute scheme which produces the 

659 same attribute and method interface as the original mapped 

660 class, allowing :class:`.AliasedClass` to be compatible 

661 with any attribute technique which works on the original class, 

662 including hybrid attributes (see :ref:`hybrids_toplevel`). 

663 

664 The :class:`.AliasedClass` can be inspected for its underlying 

665 :class:`_orm.Mapper`, aliased selectable, and other information 

666 using :func:`_sa.inspect`:: 

667 

668 from sqlalchemy import inspect 

669 

670 my_alias = aliased(MyClass) 

671 insp = inspect(my_alias) 

672 

673 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

674 

675 

676 .. seealso:: 

677 

678 :func:`.aliased` 

679 

680 :func:`.with_polymorphic` 

681 

682 :ref:`relationship_aliased_class` 

683 

684 :ref:`relationship_to_window_function` 

685 

686 

687 """ 

688 

689 __name__: str 

690 

691 def __init__( 

692 self, 

693 mapped_class_or_ac: _EntityType[_O], 

694 alias: Optional[FromClause] = None, 

695 name: Optional[str] = None, 

696 flat: bool = False, 

697 adapt_on_names: bool = False, 

698 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

699 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

700 base_alias: Optional[AliasedInsp[Any]] = None, 

701 use_mapper_path: bool = False, 

702 represents_outer_join: bool = False, 

703 ): 

704 insp = cast( 

705 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

706 ) 

707 mapper = insp.mapper 

708 

709 nest_adapters = False 

710 

711 if alias is None: 

712 if insp.is_aliased_class and insp.selectable._is_subquery: 

713 alias = insp.selectable.alias() 

714 else: 

715 alias = ( 

716 mapper._with_polymorphic_selectable._anonymous_fromclause( 

717 name=name, 

718 flat=flat, 

719 ) 

720 ) 

721 elif insp.is_aliased_class: 

722 nest_adapters = True 

723 

724 assert alias is not None 

725 self._aliased_insp = AliasedInsp( 

726 self, 

727 insp, 

728 alias, 

729 name, 

730 ( 

731 with_polymorphic_mappers 

732 if with_polymorphic_mappers 

733 else mapper.with_polymorphic_mappers 

734 ), 

735 ( 

736 with_polymorphic_discriminator 

737 if with_polymorphic_discriminator is not None 

738 else mapper.polymorphic_on 

739 ), 

740 base_alias, 

741 use_mapper_path, 

742 adapt_on_names, 

743 represents_outer_join, 

744 nest_adapters, 

745 ) 

746 

747 self.__name__ = f"aliased({mapper.class_.__name__})" 

748 

749 @classmethod 

750 def _reconstitute_from_aliased_insp( 

751 cls, aliased_insp: AliasedInsp[_O] 

752 ) -> AliasedClass[_O]: 

753 obj = cls.__new__(cls) 

754 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

755 obj._aliased_insp = aliased_insp 

756 

757 if aliased_insp._is_with_polymorphic: 

758 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

759 if sub_aliased_insp is not aliased_insp: 

760 ent = AliasedClass._reconstitute_from_aliased_insp( 

761 sub_aliased_insp 

762 ) 

763 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

764 

765 return obj 

766 

767 def __getattr__(self, key: str) -> Any: 

768 try: 

769 _aliased_insp = self.__dict__["_aliased_insp"] 

770 except KeyError: 

771 raise AttributeError() 

772 else: 

773 target = _aliased_insp._target 

774 # maintain all getattr mechanics 

775 attr = getattr(target, key) 

776 

777 # attribute is a method, that will be invoked against a 

778 # "self"; so just return a new method with the same function and 

779 # new self 

780 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

781 return types.MethodType(attr.__func__, self) 

782 

783 # attribute is a descriptor, that will be invoked against a 

784 # "self"; so invoke the descriptor against this self 

785 if hasattr(attr, "__get__"): 

786 attr = attr.__get__(None, self) 

787 

788 # attributes within the QueryableAttribute system will want this 

789 # to be invoked so the object can be adapted 

790 if hasattr(attr, "adapt_to_entity"): 

791 attr = attr.adapt_to_entity(_aliased_insp) 

792 setattr(self, key, attr) 

793 

794 return attr 

795 

796 def _get_from_serialized( 

797 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

798 ) -> Any: 

799 # this method is only used in terms of the 

800 # sqlalchemy.ext.serializer extension 

801 attr = getattr(mapped_class, key) 

802 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

803 return types.MethodType(attr.__func__, self) 

804 

805 # attribute is a descriptor, that will be invoked against a 

806 # "self"; so invoke the descriptor against this self 

807 if hasattr(attr, "__get__"): 

808 attr = attr.__get__(None, self) 

809 

810 # attributes within the QueryableAttribute system will want this 

811 # to be invoked so the object can be adapted 

812 if hasattr(attr, "adapt_to_entity"): 

813 aliased_insp._weak_entity = weakref.ref(self) 

814 attr = attr.adapt_to_entity(aliased_insp) 

815 setattr(self, key, attr) 

816 

817 return attr 

818 

819 def __repr__(self) -> str: 

820 return "<AliasedClass at 0x%x; %s>" % ( 

821 id(self), 

822 self._aliased_insp._target.__name__, 

823 ) 

824 

825 def __str__(self) -> str: 

826 return str(self._aliased_insp) 

827 

828 

829@inspection._self_inspects 

830class AliasedInsp( 

831 ORMEntityColumnsClauseRole[_O], 

832 ORMFromClauseRole, 

833 HasCacheKey, 

834 InspectionAttr, 

835 MemoizedSlots, 

836 inspection.Inspectable["AliasedInsp[_O]"], 

837 Generic[_O], 

838): 

839 """Provide an inspection interface for an 

840 :class:`.AliasedClass` object. 

841 

842 The :class:`.AliasedInsp` object is returned 

843 given an :class:`.AliasedClass` using the 

844 :func:`_sa.inspect` function:: 

845 

846 from sqlalchemy import inspect 

847 from sqlalchemy.orm import aliased 

848 

849 my_alias = aliased(MyMappedClass) 

850 insp = inspect(my_alias) 

851 

852 Attributes on :class:`.AliasedInsp` 

853 include: 

854 

855 * ``entity`` - the :class:`.AliasedClass` represented. 

856 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

857 * ``selectable`` - the :class:`_expression.Alias` 

858 construct which ultimately 

859 represents an aliased :class:`_schema.Table` or 

860 :class:`_expression.Select` 

861 construct. 

862 * ``name`` - the name of the alias. Also is used as the attribute 

863 name when returned in a result tuple from :class:`_query.Query`. 

864 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

865 objects 

866 indicating all those mappers expressed in the select construct 

867 for the :class:`.AliasedClass`. 

868 * ``polymorphic_on`` - an alternate column or SQL expression which 

869 will be used as the "discriminator" for a polymorphic load. 

870 

871 .. seealso:: 

872 

873 :ref:`inspection_toplevel` 

874 

875 """ 

876 

877 __slots__ = ( 

878 "__weakref__", 

879 "_weak_entity", 

880 "mapper", 

881 "selectable", 

882 "name", 

883 "_adapt_on_names", 

884 "with_polymorphic_mappers", 

885 "polymorphic_on", 

886 "_use_mapper_path", 

887 "_base_alias", 

888 "represents_outer_join", 

889 "persist_selectable", 

890 "local_table", 

891 "_is_with_polymorphic", 

892 "_with_polymorphic_entities", 

893 "_adapter", 

894 "_target", 

895 "__clause_element__", 

896 "_memoized_values", 

897 "_all_column_expressions", 

898 "_nest_adapters", 

899 ) 

900 

901 _cache_key_traversal = [ 

902 ("name", visitors.ExtendedInternalTraversal.dp_string), 

903 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

904 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

905 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

906 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

907 ( 

908 "with_polymorphic_mappers", 

909 visitors.InternalTraversal.dp_has_cache_key_list, 

910 ), 

911 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

912 ] 

913 

914 mapper: Mapper[_O] 

915 selectable: FromClause 

916 _adapter: ORMAdapter 

917 with_polymorphic_mappers: Sequence[Mapper[Any]] 

918 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

919 

920 _weak_entity: weakref.ref[AliasedClass[_O]] 

921 """the AliasedClass that refers to this AliasedInsp""" 

922 

923 _target: Union[Type[_O], AliasedClass[_O]] 

924 """the thing referenced by the AliasedClass/AliasedInsp. 

925 

926 In the vast majority of cases, this is the mapped class. However 

927 it may also be another AliasedClass (alias of alias). 

928 

929 """ 

930 

931 def __init__( 

932 self, 

933 entity: AliasedClass[_O], 

934 inspected: _InternalEntityType[_O], 

935 selectable: FromClause, 

936 name: Optional[str], 

937 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

938 polymorphic_on: Optional[ColumnElement[Any]], 

939 _base_alias: Optional[AliasedInsp[Any]], 

940 _use_mapper_path: bool, 

941 adapt_on_names: bool, 

942 represents_outer_join: bool, 

943 nest_adapters: bool, 

944 ): 

945 mapped_class_or_ac = inspected.entity 

946 mapper = inspected.mapper 

947 

948 self._weak_entity = weakref.ref(entity) 

949 self.mapper = mapper 

950 self.selectable = self.persist_selectable = self.local_table = ( 

951 selectable 

952 ) 

953 self.name = name 

954 self.polymorphic_on = polymorphic_on 

955 self._base_alias = weakref.ref(_base_alias or self) 

956 self._use_mapper_path = _use_mapper_path 

957 self.represents_outer_join = represents_outer_join 

958 self._nest_adapters = nest_adapters 

959 

960 if with_polymorphic_mappers: 

961 self._is_with_polymorphic = True 

962 self.with_polymorphic_mappers = with_polymorphic_mappers 

963 self._with_polymorphic_entities = [] 

964 for poly in self.with_polymorphic_mappers: 

965 if poly is not mapper: 

966 ent = AliasedClass( 

967 poly.class_, 

968 selectable, 

969 base_alias=self, 

970 adapt_on_names=adapt_on_names, 

971 use_mapper_path=_use_mapper_path, 

972 ) 

973 

974 setattr(self.entity, poly.class_.__name__, ent) 

975 self._with_polymorphic_entities.append(ent._aliased_insp) 

976 

977 else: 

978 self._is_with_polymorphic = False 

979 self.with_polymorphic_mappers = [mapper] 

980 

981 self._adapter = ORMAdapter( 

982 _TraceAdaptRole.ALIASED_INSP, 

983 mapper, 

984 selectable=selectable, 

985 equivalents=mapper._equivalent_columns, 

986 adapt_on_names=adapt_on_names, 

987 anonymize_labels=True, 

988 # make sure the adapter doesn't try to grab other tables that 

989 # are not even the thing we are mapping, such as embedded 

990 # selectables in subqueries or CTEs. See issue #6060 

991 adapt_from_selectables={ 

992 m.selectable 

993 for m in self.with_polymorphic_mappers 

994 if not adapt_on_names 

995 }, 

996 limit_on_entity=False, 

997 ) 

998 

999 if nest_adapters: 

1000 # supports "aliased class of aliased class" use case 

1001 assert isinstance(inspected, AliasedInsp) 

1002 self._adapter = inspected._adapter.wrap(self._adapter) 

1003 

1004 self._adapt_on_names = adapt_on_names 

1005 self._target = mapped_class_or_ac 

1006 

1007 @classmethod 

1008 def _alias_factory( 

1009 cls, 

1010 element: Union[_EntityType[_O], FromClause], 

1011 alias: Optional[FromClause] = None, 

1012 name: Optional[str] = None, 

1013 flat: bool = False, 

1014 adapt_on_names: bool = False, 

1015 ) -> Union[AliasedClass[_O], FromClause]: 

1016 if isinstance(element, FromClause): 

1017 if adapt_on_names: 

1018 raise sa_exc.ArgumentError( 

1019 "adapt_on_names only applies to ORM elements" 

1020 ) 

1021 if name: 

1022 return element.alias(name=name, flat=flat) 

1023 else: 

1024 return coercions.expect( 

1025 roles.AnonymizedFromClauseRole, element, flat=flat 

1026 ) 

1027 else: 

1028 return AliasedClass( 

1029 element, 

1030 alias=alias, 

1031 flat=flat, 

1032 name=name, 

1033 adapt_on_names=adapt_on_names, 

1034 ) 

1035 

1036 @classmethod 

1037 def _with_polymorphic_factory( 

1038 cls, 

1039 base: Union[Type[_O], Mapper[_O]], 

1040 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1041 selectable: Union[Literal[False, None], FromClause] = False, 

1042 flat: bool = False, 

1043 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1044 aliased: bool = False, 

1045 innerjoin: bool = False, 

1046 adapt_on_names: bool = False, 

1047 name: Optional[str] = None, 

1048 _use_mapper_path: bool = False, 

1049 ) -> AliasedClass[_O]: 

1050 primary_mapper = _class_to_mapper(base) 

1051 

1052 if selectable not in (None, False) and flat: 

1053 raise sa_exc.ArgumentError( 

1054 "the 'flat' and 'selectable' arguments cannot be passed " 

1055 "simultaneously to with_polymorphic()" 

1056 ) 

1057 

1058 mappers, selectable = primary_mapper._with_polymorphic_args( 

1059 classes, selectable, innerjoin=innerjoin 

1060 ) 

1061 if aliased or flat: 

1062 assert selectable is not None 

1063 selectable = selectable._anonymous_fromclause(flat=flat) 

1064 

1065 return AliasedClass( 

1066 base, 

1067 selectable, 

1068 name=name, 

1069 with_polymorphic_mappers=mappers, 

1070 adapt_on_names=adapt_on_names, 

1071 with_polymorphic_discriminator=polymorphic_on, 

1072 use_mapper_path=_use_mapper_path, 

1073 represents_outer_join=not innerjoin, 

1074 ) 

1075 

1076 @property 

1077 def entity(self) -> AliasedClass[_O]: 

1078 # to eliminate reference cycles, the AliasedClass is held weakly. 

1079 # this produces some situations where the AliasedClass gets lost, 

1080 # particularly when one is created internally and only the AliasedInsp 

1081 # is passed around. 

1082 # to work around this case, we just generate a new one when we need 

1083 # it, as it is a simple class with very little initial state on it. 

1084 ent = self._weak_entity() 

1085 if ent is None: 

1086 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1087 self._weak_entity = weakref.ref(ent) 

1088 return ent 

1089 

1090 is_aliased_class = True 

1091 "always returns True" 

1092 

1093 def _memoized_method___clause_element__(self) -> FromClause: 

1094 return self.selectable._annotate( 

1095 { 

1096 "parentmapper": self.mapper, 

1097 "parententity": self, 

1098 "entity_namespace": self, 

1099 } 

1100 )._set_propagate_attrs( 

1101 {"compile_state_plugin": "orm", "plugin_subject": self} 

1102 ) 

1103 

1104 @property 

1105 def entity_namespace(self) -> AliasedClass[_O]: 

1106 return self.entity 

1107 

1108 @property 

1109 def class_(self) -> Type[_O]: 

1110 """Return the mapped class ultimately represented by this 

1111 :class:`.AliasedInsp`.""" 

1112 return self.mapper.class_ 

1113 

1114 @property 

1115 def _path_registry(self) -> _AbstractEntityRegistry: 

1116 if self._use_mapper_path: 

1117 return self.mapper._path_registry 

1118 else: 

1119 return PathRegistry.per_mapper(self) 

1120 

1121 def __getstate__(self) -> Dict[str, Any]: 

1122 return { 

1123 "entity": self.entity, 

1124 "mapper": self.mapper, 

1125 "alias": self.selectable, 

1126 "name": self.name, 

1127 "adapt_on_names": self._adapt_on_names, 

1128 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1129 "with_polymorphic_discriminator": self.polymorphic_on, 

1130 "base_alias": self._base_alias(), 

1131 "use_mapper_path": self._use_mapper_path, 

1132 "represents_outer_join": self.represents_outer_join, 

1133 "nest_adapters": self._nest_adapters, 

1134 } 

1135 

1136 def __setstate__(self, state: Dict[str, Any]) -> None: 

1137 self.__init__( # type: ignore 

1138 state["entity"], 

1139 state["mapper"], 

1140 state["alias"], 

1141 state["name"], 

1142 state["with_polymorphic_mappers"], 

1143 state["with_polymorphic_discriminator"], 

1144 state["base_alias"], 

1145 state["use_mapper_path"], 

1146 state["adapt_on_names"], 

1147 state["represents_outer_join"], 

1148 state["nest_adapters"], 

1149 ) 

1150 

1151 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1152 # assert self._is_with_polymorphic 

1153 # assert other._is_with_polymorphic 

1154 

1155 primary_mapper = other.mapper 

1156 

1157 assert self.mapper is primary_mapper 

1158 

1159 our_classes = util.to_set( 

1160 mp.class_ for mp in self.with_polymorphic_mappers 

1161 ) 

1162 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1163 if our_classes == new_classes: 

1164 return other 

1165 else: 

1166 classes = our_classes.union(new_classes) 

1167 

1168 mappers, selectable = primary_mapper._with_polymorphic_args( 

1169 classes, None, innerjoin=not other.represents_outer_join 

1170 ) 

1171 selectable = selectable._anonymous_fromclause(flat=True) 

1172 return AliasedClass( 

1173 primary_mapper, 

1174 selectable, 

1175 with_polymorphic_mappers=mappers, 

1176 with_polymorphic_discriminator=other.polymorphic_on, 

1177 use_mapper_path=other._use_mapper_path, 

1178 represents_outer_join=other.represents_outer_join, 

1179 )._aliased_insp 

1180 

1181 def _adapt_element( 

1182 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1183 ) -> _ORMCOLEXPR: 

1184 assert isinstance(expr, ColumnElement) 

1185 d: Dict[str, Any] = { 

1186 "parententity": self, 

1187 "parentmapper": self.mapper, 

1188 } 

1189 if key: 

1190 d["proxy_key"] = key 

1191 

1192 # userspace adapt of an attribute from AliasedClass; validate that 

1193 # it actually was present 

1194 adapted = self._adapter.adapt_check_present(expr) 

1195 if adapted is None: 

1196 adapted = expr 

1197 if self._adapter.adapt_on_names: 

1198 util.warn_limited( 

1199 "Did not locate an expression in selectable for " 

1200 "attribute %r; ensure name is correct in expression", 

1201 (key,), 

1202 ) 

1203 else: 

1204 util.warn_limited( 

1205 "Did not locate an expression in selectable for " 

1206 "attribute %r; to match by name, use the " 

1207 "adapt_on_names parameter", 

1208 (key,), 

1209 ) 

1210 

1211 return adapted._annotate(d)._set_propagate_attrs( 

1212 {"compile_state_plugin": "orm", "plugin_subject": self} 

1213 ) 

1214 

1215 if TYPE_CHECKING: 

1216 # establish compatibility with the _ORMAdapterProto protocol, 

1217 # which in turn is compatible with _CoreAdapterProto. 

1218 

1219 def _orm_adapt_element( 

1220 self, 

1221 obj: _CE, 

1222 key: Optional[str] = None, 

1223 ) -> _CE: ... 

1224 

1225 else: 

1226 _orm_adapt_element = _adapt_element 

1227 

1228 def _entity_for_mapper(self, mapper): 

1229 self_poly = self.with_polymorphic_mappers 

1230 if mapper in self_poly: 

1231 if mapper is self.mapper: 

1232 return self 

1233 else: 

1234 return getattr( 

1235 self.entity, mapper.class_.__name__ 

1236 )._aliased_insp 

1237 elif mapper.isa(self.mapper): 

1238 return self 

1239 else: 

1240 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1241 

1242 def _memoized_attr__get_clause(self): 

1243 onclause, replacemap = self.mapper._get_clause 

1244 return ( 

1245 self._adapter.traverse(onclause), 

1246 { 

1247 self._adapter.traverse(col): param 

1248 for col, param in replacemap.items() 

1249 }, 

1250 ) 

1251 

1252 def _memoized_attr__memoized_values(self): 

1253 return {} 

1254 

1255 def _memoized_attr__all_column_expressions(self): 

1256 if self._is_with_polymorphic: 

1257 cols_plus_keys = self.mapper._columns_plus_keys( 

1258 [ent.mapper for ent in self._with_polymorphic_entities] 

1259 ) 

1260 else: 

1261 cols_plus_keys = self.mapper._columns_plus_keys() 

1262 

1263 cols_plus_keys = [ 

1264 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1265 ] 

1266 

1267 return ColumnCollection(cols_plus_keys) 

1268 

1269 def _memo(self, key, callable_, *args, **kw): 

1270 if key in self._memoized_values: 

1271 return self._memoized_values[key] 

1272 else: 

1273 self._memoized_values[key] = value = callable_(*args, **kw) 

1274 return value 

1275 

1276 def __repr__(self): 

1277 if self.with_polymorphic_mappers: 

1278 with_poly = "(%s)" % ", ".join( 

1279 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1280 ) 

1281 else: 

1282 with_poly = "" 

1283 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1284 id(self), 

1285 self.class_.__name__, 

1286 with_poly, 

1287 ) 

1288 

1289 def __str__(self): 

1290 if self._is_with_polymorphic: 

1291 return "with_polymorphic(%s, [%s])" % ( 

1292 self._target.__name__, 

1293 ", ".join( 

1294 mp.class_.__name__ 

1295 for mp in self.with_polymorphic_mappers 

1296 if mp is not self.mapper 

1297 ), 

1298 ) 

1299 else: 

1300 return "aliased(%s)" % (self._target.__name__,) 

1301 

1302 

1303class _WrapUserEntity: 

1304 """A wrapper used within the loader_criteria lambda caller so that 

1305 we can bypass declared_attr descriptors on unmapped mixins, which 

1306 normally emit a warning for such use. 

1307 

1308 might also be useful for other per-lambda instrumentations should 

1309 the need arise. 

1310 

1311 """ 

1312 

1313 __slots__ = ("subject",) 

1314 

1315 def __init__(self, subject): 

1316 self.subject = subject 

1317 

1318 @util.preload_module("sqlalchemy.orm.decl_api") 

1319 def __getattribute__(self, name): 

1320 decl_api = util.preloaded.orm.decl_api 

1321 

1322 subject = object.__getattribute__(self, "subject") 

1323 if name in subject.__dict__ and isinstance( 

1324 subject.__dict__[name], decl_api.declared_attr 

1325 ): 

1326 return subject.__dict__[name].fget(subject) 

1327 else: 

1328 return getattr(subject, name) 

1329 

1330 

1331class LoaderCriteriaOption(CriteriaOption): 

1332 """Add additional WHERE criteria to the load for all occurrences of 

1333 a particular entity. 

1334 

1335 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1336 :func:`_orm.with_loader_criteria` function; see that function for 

1337 details. 

1338 

1339 .. versionadded:: 1.4 

1340 

1341 """ 

1342 

1343 __slots__ = ( 

1344 "root_entity", 

1345 "entity", 

1346 "deferred_where_criteria", 

1347 "where_criteria", 

1348 "_where_crit_orig", 

1349 "include_aliases", 

1350 "propagate_to_loaders", 

1351 ) 

1352 

1353 _traverse_internals = [ 

1354 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1355 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1356 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1357 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1358 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1359 ] 

1360 

1361 root_entity: Optional[Type[Any]] 

1362 entity: Optional[_InternalEntityType[Any]] 

1363 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1364 deferred_where_criteria: bool 

1365 include_aliases: bool 

1366 propagate_to_loaders: bool 

1367 

1368 _where_crit_orig: Any 

1369 

1370 def __init__( 

1371 self, 

1372 entity_or_base: _EntityType[Any], 

1373 where_criteria: Union[ 

1374 _ColumnExpressionArgument[bool], 

1375 Callable[[Any], _ColumnExpressionArgument[bool]], 

1376 ], 

1377 loader_only: bool = False, 

1378 include_aliases: bool = False, 

1379 propagate_to_loaders: bool = True, 

1380 track_closure_variables: bool = True, 

1381 ): 

1382 entity = cast( 

1383 "_InternalEntityType[Any]", 

1384 inspection.inspect(entity_or_base, False), 

1385 ) 

1386 if entity is None: 

1387 self.root_entity = cast("Type[Any]", entity_or_base) 

1388 self.entity = None 

1389 else: 

1390 self.root_entity = None 

1391 self.entity = entity 

1392 

1393 self._where_crit_orig = where_criteria 

1394 if callable(where_criteria): 

1395 if self.root_entity is not None: 

1396 wrap_entity = self.root_entity 

1397 else: 

1398 assert entity is not None 

1399 wrap_entity = entity.entity 

1400 

1401 self.deferred_where_criteria = True 

1402 self.where_criteria = lambdas.DeferredLambdaElement( 

1403 where_criteria, 

1404 roles.WhereHavingRole, 

1405 lambda_args=(_WrapUserEntity(wrap_entity),), 

1406 opts=lambdas.LambdaOptions( 

1407 track_closure_variables=track_closure_variables 

1408 ), 

1409 ) 

1410 else: 

1411 self.deferred_where_criteria = False 

1412 self.where_criteria = coercions.expect( 

1413 roles.WhereHavingRole, where_criteria 

1414 ) 

1415 

1416 self.include_aliases = include_aliases 

1417 self.propagate_to_loaders = propagate_to_loaders 

1418 

1419 @classmethod 

1420 def _unreduce( 

1421 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1422 ): 

1423 return LoaderCriteriaOption( 

1424 entity, 

1425 where_criteria, 

1426 include_aliases=include_aliases, 

1427 propagate_to_loaders=propagate_to_loaders, 

1428 ) 

1429 

1430 def __reduce__(self): 

1431 return ( 

1432 LoaderCriteriaOption._unreduce, 

1433 ( 

1434 self.entity.class_ if self.entity else self.root_entity, 

1435 self._where_crit_orig, 

1436 self.include_aliases, 

1437 self.propagate_to_loaders, 

1438 ), 

1439 ) 

1440 

1441 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1442 if self.entity: 

1443 yield from self.entity.mapper.self_and_descendants 

1444 else: 

1445 assert self.root_entity 

1446 stack = list(self.root_entity.__subclasses__()) 

1447 while stack: 

1448 subclass = stack.pop(0) 

1449 ent = cast( 

1450 "_InternalEntityType[Any]", 

1451 inspection.inspect(subclass, raiseerr=False), 

1452 ) 

1453 if ent: 

1454 yield from ent.mapper.self_and_descendants 

1455 else: 

1456 stack.extend(subclass.__subclasses__()) 

1457 

1458 def _should_include(self, compile_state: _ORMCompileState) -> bool: 

1459 if ( 

1460 compile_state.select_statement._annotations.get( 

1461 "for_loader_criteria", None 

1462 ) 

1463 is self 

1464 ): 

1465 return False 

1466 return True 

1467 

1468 def _resolve_where_criteria( 

1469 self, ext_info: _InternalEntityType[Any] 

1470 ) -> ColumnElement[bool]: 

1471 if self.deferred_where_criteria: 

1472 crit = cast( 

1473 "ColumnElement[bool]", 

1474 self.where_criteria._resolve_with_args(ext_info.entity), 

1475 ) 

1476 else: 

1477 crit = self.where_criteria # type: ignore 

1478 assert isinstance(crit, ColumnElement) 

1479 return sql_util._deep_annotate( 

1480 crit, 

1481 {"for_loader_criteria": self}, 

1482 detect_subquery_cols=True, 

1483 ind_cols_on_fromclause=True, 

1484 ) 

1485 

1486 def process_compile_state_replaced_entities( 

1487 self, 

1488 compile_state: _ORMCompileState, 

1489 mapper_entities: Iterable[_MapperEntity], 

1490 ) -> None: 

1491 self.process_compile_state(compile_state) 

1492 

1493 def process_compile_state(self, compile_state: _ORMCompileState) -> None: 

1494 """Apply a modification to a given :class:`.CompileState`.""" 

1495 

1496 # if options to limit the criteria to immediate query only, 

1497 # use compile_state.attributes instead 

1498 

1499 self.get_global_criteria(compile_state.global_attributes) 

1500 

1501 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1502 for mp in self._all_mappers(): 

1503 load_criteria = attributes.setdefault( 

1504 ("additional_entity_criteria", mp), [] 

1505 ) 

1506 

1507 load_criteria.append(self) 

1508 

1509 

1510inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1511 

1512 

1513@inspection._inspects(type) 

1514def _inspect_mc( 

1515 class_: Type[_O], 

1516) -> Optional[Mapper[_O]]: 

1517 try: 

1518 class_manager = opt_manager_of_class(class_) 

1519 if class_manager is None or not class_manager.is_mapped: 

1520 return None 

1521 mapper = class_manager.mapper 

1522 except exc.NO_STATE: 

1523 return None 

1524 else: 

1525 return mapper 

1526 

1527 

1528GenericAlias = type(List[Any]) 

1529 

1530 

1531@inspection._inspects(GenericAlias) 

1532def _inspect_generic_alias( 

1533 class_: Type[_O], 

1534) -> Optional[Mapper[_O]]: 

1535 origin = cast("Type[_O]", get_origin(class_)) 

1536 return _inspect_mc(origin) 

1537 

1538 

1539@inspection._self_inspects 

1540class Bundle( 

1541 ORMColumnsClauseRole[_T], 

1542 SupportsCloneAnnotations, 

1543 MemoizedHasCacheKey, 

1544 inspection.Inspectable["Bundle[_T]"], 

1545 InspectionAttr, 

1546): 

1547 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1548 under one namespace. 

1549 

1550 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1551 results returned by a column-oriented :class:`_query.Query` object. 

1552 It also 

1553 is extensible via simple subclassing, where the primary capability 

1554 to override is that of how the set of expressions should be returned, 

1555 allowing post-processing as well as custom return types, without 

1556 involving ORM identity-mapped classes. 

1557 

1558 .. seealso:: 

1559 

1560 :ref:`bundles` 

1561 

1562 

1563 """ 

1564 

1565 single_entity = False 

1566 """If True, queries for a single Bundle will be returned as a single 

1567 entity, rather than an element within a keyed tuple.""" 

1568 

1569 is_clause_element = False 

1570 

1571 is_mapper = False 

1572 

1573 is_aliased_class = False 

1574 

1575 is_bundle = True 

1576 

1577 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1578 

1579 proxy_set = util.EMPTY_SET 

1580 

1581 exprs: List[_ColumnsClauseElement] 

1582 

1583 def __init__( 

1584 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1585 ): 

1586 r"""Construct a new :class:`.Bundle`. 

1587 

1588 e.g.:: 

1589 

1590 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1591 

1592 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1593 print(row.mybundle.x, row.mybundle.y) 

1594 

1595 :param name: name of the bundle. 

1596 :param \*exprs: columns or SQL expressions comprising the bundle. 

1597 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1598 can be returned as a "single entity" outside of any enclosing tuple 

1599 in the same manner as a mapped entity. 

1600 

1601 """ # noqa: E501 

1602 self.name = self._label = name 

1603 coerced_exprs = [ 

1604 coercions.expect( 

1605 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1606 ) 

1607 for expr in exprs 

1608 ] 

1609 self.exprs = coerced_exprs 

1610 

1611 self.c = self.columns = ColumnCollection( 

1612 (getattr(col, "key", col._label), col) 

1613 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1614 ).as_readonly() 

1615 self.single_entity = kw.pop("single_entity", self.single_entity) 

1616 

1617 def _gen_cache_key( 

1618 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1619 ) -> Tuple[Any, ...]: 

1620 return (self.__class__, self.name, self.single_entity) + tuple( 

1621 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1622 ) 

1623 

1624 @property 

1625 def mapper(self) -> Optional[Mapper[Any]]: 

1626 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1627 "parentmapper", None 

1628 ) 

1629 return mp 

1630 

1631 @property 

1632 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1633 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1634 0 

1635 ]._annotations.get("parententity", None) 

1636 return ie 

1637 

1638 @property 

1639 def entity_namespace( 

1640 self, 

1641 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1642 return self.c 

1643 

1644 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1645 

1646 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1647 

1648 e.g.:: 

1649 

1650 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1651 

1652 q = sess.query(bn).filter(bn.c.x == 5) 

1653 

1654 Nesting of bundles is also supported:: 

1655 

1656 b1 = Bundle( 

1657 "b1", 

1658 Bundle("b2", MyClass.a, MyClass.b), 

1659 Bundle("b3", MyClass.x, MyClass.y), 

1660 ) 

1661 

1662 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1663 

1664 .. seealso:: 

1665 

1666 :attr:`.Bundle.c` 

1667 

1668 """ # noqa: E501 

1669 

1670 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1671 """An alias for :attr:`.Bundle.columns`.""" 

1672 

1673 def _clone(self, **kw): 

1674 cloned = self.__class__.__new__(self.__class__) 

1675 cloned.__dict__.update(self.__dict__) 

1676 return cloned 

1677 

1678 def __clause_element__(self): 

1679 # ensure existing entity_namespace remains 

1680 annotations = {"bundle": self, "entity_namespace": self} 

1681 annotations.update(self._annotations) 

1682 

1683 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1684 "plugin_subject", self.entity 

1685 ) 

1686 return ( 

1687 expression.ClauseList( 

1688 _literal_as_text_role=roles.ColumnsClauseRole, 

1689 group=False, 

1690 *[e._annotations.get("bundle", e) for e in self.exprs], 

1691 ) 

1692 ._annotate(annotations) 

1693 ._set_propagate_attrs( 

1694 # the Bundle *must* use the orm plugin no matter what. the 

1695 # subject can be None but it's much better if it's not. 

1696 { 

1697 "compile_state_plugin": "orm", 

1698 "plugin_subject": plugin_subject, 

1699 } 

1700 ) 

1701 ) 

1702 

1703 @property 

1704 def clauses(self): 

1705 return self.__clause_element__().clauses 

1706 

1707 def label(self, name): 

1708 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1709 

1710 cloned = self._clone() 

1711 cloned.name = name 

1712 return cloned 

1713 

1714 def create_row_processor( 

1715 self, 

1716 query: Select[Unpack[TupleAny]], 

1717 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1718 labels: Sequence[str], 

1719 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1720 """Produce the "row processing" function for this :class:`.Bundle`. 

1721 

1722 May be overridden by subclasses to provide custom behaviors when 

1723 results are fetched. The method is passed the statement object and a 

1724 set of "row processor" functions at query execution time; these 

1725 processor functions when given a result row will return the individual 

1726 attribute value, which can then be adapted into any kind of return data 

1727 structure. 

1728 

1729 The example below illustrates replacing the usual :class:`.Row` 

1730 return structure with a straight Python dictionary:: 

1731 

1732 from sqlalchemy.orm import Bundle 

1733 

1734 

1735 class DictBundle(Bundle): 

1736 def create_row_processor(self, query, procs, labels): 

1737 "Override create_row_processor to return values as dictionaries" 

1738 

1739 def proc(row): 

1740 return dict(zip(labels, (proc(row) for proc in procs))) 

1741 

1742 return proc 

1743 

1744 A result from the above :class:`_orm.Bundle` will return dictionary 

1745 values:: 

1746 

1747 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1748 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1749 print(row.mybundle["data1"], row.mybundle["data2"]) 

1750 

1751 """ # noqa: E501 

1752 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1753 

1754 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1755 return keyed_tuple([proc(row) for proc in procs]) 

1756 

1757 return proc 

1758 

1759 

1760def _orm_full_deannotate(element: _SA) -> _SA: 

1761 return sql_util._deep_deannotate(element) 

1762 

1763 

1764class _ORMJoin(expression.Join): 

1765 """Extend Join to support ORM constructs as input.""" 

1766 

1767 __visit_name__ = expression.Join.__visit_name__ 

1768 

1769 inherit_cache = True 

1770 

1771 def __init__( 

1772 self, 

1773 left: _FromClauseArgument, 

1774 right: _FromClauseArgument, 

1775 onclause: Optional[_OnClauseArgument] = None, 

1776 isouter: bool = False, 

1777 full: bool = False, 

1778 _left_memo: Optional[Any] = None, 

1779 _right_memo: Optional[Any] = None, 

1780 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1781 ): 

1782 left_info = cast( 

1783 "Union[FromClause, _InternalEntityType[Any]]", 

1784 inspection.inspect(left), 

1785 ) 

1786 

1787 right_info = cast( 

1788 "Union[FromClause, _InternalEntityType[Any]]", 

1789 inspection.inspect(right), 

1790 ) 

1791 adapt_to = right_info.selectable 

1792 

1793 # used by joined eager loader 

1794 self._left_memo = _left_memo 

1795 self._right_memo = _right_memo 

1796 

1797 if isinstance(onclause, attributes.QueryableAttribute): 

1798 if TYPE_CHECKING: 

1799 assert isinstance( 

1800 onclause.comparator, RelationshipProperty.Comparator 

1801 ) 

1802 on_selectable = onclause.comparator._source_selectable() 

1803 prop = onclause.property 

1804 _extra_criteria += onclause._extra_criteria 

1805 elif isinstance(onclause, MapperProperty): 

1806 # used internally by joined eager loader...possibly not ideal 

1807 prop = onclause 

1808 on_selectable = prop.parent.selectable 

1809 else: 

1810 prop = None 

1811 on_selectable = None 

1812 

1813 left_selectable = left_info.selectable 

1814 if prop: 

1815 adapt_from: Optional[FromClause] 

1816 if sql_util.clause_is_present(on_selectable, left_selectable): 

1817 adapt_from = on_selectable 

1818 else: 

1819 assert isinstance(left_selectable, FromClause) 

1820 adapt_from = left_selectable 

1821 

1822 ( 

1823 pj, 

1824 sj, 

1825 source, 

1826 dest, 

1827 secondary, 

1828 target_adapter, 

1829 ) = prop._create_joins( 

1830 source_selectable=adapt_from, 

1831 dest_selectable=adapt_to, 

1832 source_polymorphic=True, 

1833 of_type_entity=right_info, 

1834 alias_secondary=True, 

1835 extra_criteria=_extra_criteria, 

1836 ) 

1837 

1838 if sj is not None: 

1839 if isouter: 

1840 # note this is an inner join from secondary->right 

1841 right = sql.join(secondary, right, sj) 

1842 onclause = pj 

1843 else: 

1844 left = sql.join(left, secondary, pj, isouter) 

1845 onclause = sj 

1846 else: 

1847 onclause = pj 

1848 

1849 self._target_adapter = target_adapter 

1850 

1851 # we don't use the normal coercions logic for _ORMJoin 

1852 # (probably should), so do some gymnastics to get the entity. 

1853 # logic here is for #8721, which was a major bug in 1.4 

1854 # for almost two years, not reported/fixed until 1.4.43 (!) 

1855 if is_selectable(left_info): 

1856 parententity = left_selectable._annotations.get( 

1857 "parententity", None 

1858 ) 

1859 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1860 parententity = left_info 

1861 else: 

1862 parententity = None 

1863 

1864 if parententity is not None: 

1865 self._annotations = self._annotations.union( 

1866 {"parententity": parententity} 

1867 ) 

1868 

1869 augment_onclause = bool(_extra_criteria) and not prop 

1870 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1871 

1872 assert self.onclause is not None 

1873 

1874 if augment_onclause: 

1875 self.onclause &= sql.and_(*_extra_criteria) 

1876 

1877 if ( 

1878 not prop 

1879 and getattr(right_info, "mapper", None) 

1880 and right_info.mapper.single # type: ignore 

1881 ): 

1882 right_info = cast("_InternalEntityType[Any]", right_info) 

1883 # if single inheritance target and we are using a manual 

1884 # or implicit ON clause, augment it the same way we'd augment the 

1885 # WHERE. 

1886 single_crit = right_info.mapper._single_table_criterion 

1887 if single_crit is not None: 

1888 if insp_is_aliased_class(right_info): 

1889 single_crit = right_info._adapter.traverse(single_crit) 

1890 self.onclause = self.onclause & single_crit 

1891 

1892 def _splice_into_center(self, other): 

1893 """Splice a join into the center. 

1894 

1895 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1896 

1897 """ 

1898 leftmost = other 

1899 while isinstance(leftmost, sql.Join): 

1900 leftmost = leftmost.left 

1901 

1902 assert self.right is leftmost 

1903 

1904 left = _ORMJoin( 

1905 self.left, 

1906 other.left, 

1907 self.onclause, 

1908 isouter=self.isouter, 

1909 _left_memo=self._left_memo, 

1910 _right_memo=other._left_memo._path_registry, 

1911 ) 

1912 

1913 return _ORMJoin( 

1914 left, 

1915 other.right, 

1916 other.onclause, 

1917 isouter=other.isouter, 

1918 _right_memo=other._right_memo, 

1919 ) 

1920 

1921 def join( 

1922 self, 

1923 right: _FromClauseArgument, 

1924 onclause: Optional[_OnClauseArgument] = None, 

1925 isouter: bool = False, 

1926 full: bool = False, 

1927 ) -> _ORMJoin: 

1928 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1929 

1930 def outerjoin( 

1931 self, 

1932 right: _FromClauseArgument, 

1933 onclause: Optional[_OnClauseArgument] = None, 

1934 full: bool = False, 

1935 ) -> _ORMJoin: 

1936 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1937 

1938 

1939def with_parent( 

1940 instance: object, 

1941 prop: attributes.QueryableAttribute[Any], 

1942 from_entity: Optional[_EntityType[Any]] = None, 

1943) -> ColumnElement[bool]: 

1944 """Create filtering criterion that relates this query's primary entity 

1945 to the given related instance, using established 

1946 :func:`_orm.relationship()` 

1947 configuration. 

1948 

1949 E.g.:: 

1950 

1951 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1952 

1953 The SQL rendered is the same as that rendered when a lazy loader 

1954 would fire off from the given parent on that attribute, meaning 

1955 that the appropriate state is taken from the parent object in 

1956 Python without the need to render joins to the parent table 

1957 in the rendered statement. 

1958 

1959 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

1960 to indicate the left side of the criteria:: 

1961 

1962 

1963 a1 = aliased(Address) 

1964 a2 = aliased(Address) 

1965 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

1966 

1967 The above use is equivalent to using the 

1968 :func:`_orm.with_parent.from_entity` argument:: 

1969 

1970 a1 = aliased(Address) 

1971 a2 = aliased(Address) 

1972 stmt = select(a1, a2).where( 

1973 with_parent(u1, User.addresses, from_entity=a2) 

1974 ) 

1975 

1976 :param instance: 

1977 An instance which has some :func:`_orm.relationship`. 

1978 

1979 :param property: 

1980 Class-bound attribute, which indicates 

1981 what relationship from the instance should be used to reconcile the 

1982 parent/child relationship. 

1983 

1984 :param from_entity: 

1985 Entity in which to consider as the left side. This defaults to the 

1986 "zero" entity of the :class:`_query.Query` itself. 

1987 

1988 """ # noqa: E501 

1989 prop_t: RelationshipProperty[Any] 

1990 

1991 if isinstance(prop, str): 

1992 raise sa_exc.ArgumentError( 

1993 "with_parent() accepts class-bound mapped attributes, not strings" 

1994 ) 

1995 elif isinstance(prop, attributes.QueryableAttribute): 

1996 if prop._of_type: 

1997 from_entity = prop._of_type 

1998 mapper_property = prop.property 

1999 if mapper_property is None or not prop_is_relationship( 

2000 mapper_property 

2001 ): 

2002 raise sa_exc.ArgumentError( 

2003 f"Expected relationship property for with_parent(), " 

2004 f"got {mapper_property}" 

2005 ) 

2006 prop_t = mapper_property 

2007 else: 

2008 prop_t = prop 

2009 

2010 return prop_t._with_parent(instance, from_entity=from_entity) 

2011 

2012 

2013def has_identity(object_: object) -> bool: 

2014 """Return True if the given object has a database 

2015 identity. 

2016 

2017 This typically corresponds to the object being 

2018 in either the persistent or detached state. 

2019 

2020 .. seealso:: 

2021 

2022 :func:`.was_deleted` 

2023 

2024 """ 

2025 state = attributes.instance_state(object_) 

2026 return state.has_identity 

2027 

2028 

2029def was_deleted(object_: object) -> bool: 

2030 """Return True if the given object was deleted 

2031 within a session flush. 

2032 

2033 This is regardless of whether or not the object is 

2034 persistent or detached. 

2035 

2036 .. seealso:: 

2037 

2038 :attr:`.InstanceState.was_deleted` 

2039 

2040 """ 

2041 

2042 state = attributes.instance_state(object_) 

2043 return state.was_deleted 

2044 

2045 

2046def _entity_corresponds_to( 

2047 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2048) -> bool: 

2049 """determine if 'given' corresponds to 'entity', in terms 

2050 of an entity passed to Query that would match the same entity 

2051 being referred to elsewhere in the query. 

2052 

2053 """ 

2054 if insp_is_aliased_class(entity): 

2055 if insp_is_aliased_class(given): 

2056 if entity._base_alias() is given._base_alias(): 

2057 return True 

2058 return False 

2059 elif insp_is_aliased_class(given): 

2060 if given._use_mapper_path: 

2061 return entity in given.with_polymorphic_mappers 

2062 else: 

2063 return entity is given 

2064 

2065 assert insp_is_mapper(given) 

2066 return entity.common_parent(given) 

2067 

2068 

2069def _entity_corresponds_to_use_path_impl( 

2070 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2071) -> bool: 

2072 """determine if 'given' corresponds to 'entity', in terms 

2073 of a path of loader options where a mapped attribute is taken to 

2074 be a member of a parent entity. 

2075 

2076 e.g.:: 

2077 

2078 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2079 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2080 

2081 a1 = aliased(A) 

2082 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2083 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2084 

2085 wp = with_polymorphic(A, [A1, A2]) 

2086 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2087 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2088 

2089 """ 

2090 if insp_is_aliased_class(given): 

2091 return ( 

2092 insp_is_aliased_class(entity) 

2093 and not entity._use_mapper_path 

2094 and (given is entity or entity in given._with_polymorphic_entities) 

2095 ) 

2096 elif not insp_is_aliased_class(entity): 

2097 return given.isa(entity.mapper) 

2098 else: 

2099 return ( 

2100 entity._use_mapper_path 

2101 and given in entity.with_polymorphic_mappers 

2102 ) 

2103 

2104 

2105def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2106 """determine if 'given' "is a" mapper, in terms of the given 

2107 would load rows of type 'mapper'. 

2108 

2109 """ 

2110 if given.is_aliased_class: 

2111 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2112 mapper 

2113 ) 

2114 elif given.with_polymorphic_mappers: 

2115 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2116 else: 

2117 return given.isa(mapper) 

2118 

2119 

2120def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2121 """calculate __getitem__ in terms of an iterable query object 

2122 that also has a slice() method. 

2123 

2124 """ 

2125 

2126 def _no_negative_indexes(): 

2127 raise IndexError( 

2128 "negative indexes are not accepted by SQL " 

2129 "index / slice operators" 

2130 ) 

2131 

2132 if isinstance(item, slice): 

2133 start, stop, step = util.decode_slice(item) 

2134 

2135 if ( 

2136 isinstance(stop, int) 

2137 and isinstance(start, int) 

2138 and stop - start <= 0 

2139 ): 

2140 return [] 

2141 

2142 elif (isinstance(start, int) and start < 0) or ( 

2143 isinstance(stop, int) and stop < 0 

2144 ): 

2145 _no_negative_indexes() 

2146 

2147 res = iterable_query.slice(start, stop) 

2148 if step is not None: 

2149 return list(res)[None : None : item.step] 

2150 else: 

2151 return list(res) 

2152 else: 

2153 if item == -1: 

2154 _no_negative_indexes() 

2155 else: 

2156 return list(iterable_query[item : item + 1])[0] 

2157 

2158 

2159def _is_mapped_annotation( 

2160 raw_annotation: _AnnotationScanType, 

2161 cls: Type[Any], 

2162 originating_cls: Type[Any], 

2163) -> bool: 

2164 try: 

2165 annotated = de_stringify_annotation( 

2166 cls, raw_annotation, originating_cls.__module__ 

2167 ) 

2168 except NameError: 

2169 # in most cases, at least within our own tests, we can raise 

2170 # here, which is more accurate as it prevents us from returning 

2171 # false negatives. However, in the real world, try to avoid getting 

2172 # involved with end-user annotations that have nothing to do with us. 

2173 # see issue #8888 where we bypass using this function in the case 

2174 # that we want to detect an unresolvable Mapped[] type. 

2175 return False 

2176 else: 

2177 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2178 

2179 

2180class _CleanupError(Exception): 

2181 pass 

2182 

2183 

2184def _cleanup_mapped_str_annotation( 

2185 annotation: str, originating_module: str 

2186) -> str: 

2187 # fix up an annotation that comes in as the form: 

2188 # 'Mapped[List[Address]]' so that it instead looks like: 

2189 # 'Mapped[List["Address"]]' , which will allow us to get 

2190 # "Address" as a string 

2191 

2192 # additionally, resolve symbols for these names since this is where 

2193 # we'd have to do it 

2194 

2195 inner: Optional[Match[str]] 

2196 

2197 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2198 

2199 if not mm: 

2200 return annotation 

2201 

2202 # ticket #8759. Resolve the Mapped name to a real symbol. 

2203 # originally this just checked the name. 

2204 try: 

2205 obj = eval_name_only(mm.group(1), originating_module) 

2206 except NameError as ne: 

2207 raise _CleanupError( 

2208 f'For annotation "{annotation}", could not resolve ' 

2209 f'container type "{mm.group(1)}". ' 

2210 "Please ensure this type is imported at the module level " 

2211 "outside of TYPE_CHECKING blocks" 

2212 ) from ne 

2213 

2214 if obj is typing.ClassVar: 

2215 real_symbol = "ClassVar" 

2216 else: 

2217 try: 

2218 if issubclass(obj, _MappedAnnotationBase): 

2219 real_symbol = obj.__name__ 

2220 else: 

2221 return annotation 

2222 except TypeError: 

2223 # avoid isinstance(obj, type) check, just catch TypeError 

2224 return annotation 

2225 

2226 # note: if one of the codepaths above didn't define real_symbol and 

2227 # then didn't return, real_symbol raises UnboundLocalError 

2228 # which is actually a NameError, and the calling routines don't 

2229 # notice this since they are catching NameError anyway. Just in case 

2230 # this is being modified in the future, something to be aware of. 

2231 

2232 stack = [] 

2233 inner = mm 

2234 while True: 

2235 stack.append(real_symbol if mm is inner else inner.group(1)) 

2236 g2 = inner.group(2) 

2237 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2238 if inner is None: 

2239 stack.append(g2) 

2240 break 

2241 

2242 # stacks we want to rewrite, that is, quote the last entry which 

2243 # we think is a relationship class name: 

2244 # 

2245 # ['Mapped', 'List', 'Address'] 

2246 # ['Mapped', 'A'] 

2247 # 

2248 # stacks we dont want to rewrite, which are generally MappedColumn 

2249 # use cases: 

2250 # 

2251 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2252 # ['Mapped', 'dict[str, str] | None'] 

2253 

2254 if ( 

2255 # avoid already quoted symbols such as 

2256 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2257 not re.match(r"""^["'].*["']$""", stack[-1]) 

2258 # avoid further generics like Dict[] such as 

2259 # ['Mapped', 'dict[str, str] | None'], 

2260 # ['Mapped', 'list[int] | list[str]'], 

2261 # ['Mapped', 'Union[list[int], list[str]]'], 

2262 and not re.search(r"[\[\]]", stack[-1]) 

2263 ): 

2264 stripchars = "\"' " 

2265 stack[-1] = ", ".join( 

2266 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2267 ) 

2268 

2269 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2270 

2271 return annotation 

2272 

2273 

2274def _extract_mapped_subtype( 

2275 raw_annotation: Optional[_AnnotationScanType], 

2276 cls: type, 

2277 originating_module: str, 

2278 key: str, 

2279 attr_cls: Type[Any], 

2280 required: bool, 

2281 is_dataclass_field: bool, 

2282 expect_mapped: bool = True, 

2283 raiseerr: bool = True, 

2284) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2285 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2286 so, return the ``something`` part. 

2287 

2288 Includes error raise scenarios and other options. 

2289 

2290 """ 

2291 

2292 if raw_annotation is None: 

2293 if required: 

2294 raise orm_exc.MappedAnnotationError( 

2295 f"Python typing annotation is required for attribute " 

2296 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2297 f'"{attr_cls.__name__}" construct are None or not present' 

2298 ) 

2299 return None 

2300 

2301 try: 

2302 # destringify the "outside" of the annotation. note we are not 

2303 # adding include_generic so it will *not* dig into generic contents, 

2304 # which will remain as ForwardRef or plain str under future annotations 

2305 # mode. The full destringify happens later when mapped_column goes 

2306 # to do a full lookup in the registry type_annotations_map. 

2307 annotated = de_stringify_annotation( 

2308 cls, 

2309 raw_annotation, 

2310 originating_module, 

2311 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2312 ) 

2313 except _CleanupError as ce: 

2314 raise orm_exc.MappedAnnotationError( 

2315 f"Could not interpret annotation {raw_annotation}. " 

2316 "Check that it uses names that are correctly imported at the " 

2317 "module level. See chained stack trace for more hints." 

2318 ) from ce 

2319 except NameError as ne: 

2320 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2321 raise orm_exc.MappedAnnotationError( 

2322 f"Could not interpret annotation {raw_annotation}. " 

2323 "Check that it uses names that are correctly imported at the " 

2324 "module level. See chained stack trace for more hints." 

2325 ) from ne 

2326 

2327 annotated = raw_annotation # type: ignore 

2328 

2329 if is_dataclass_field: 

2330 return annotated, None 

2331 else: 

2332 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2333 annotated, _MappedAnnotationBase 

2334 ): 

2335 if expect_mapped: 

2336 if not raiseerr: 

2337 return None 

2338 

2339 origin = getattr(annotated, "__origin__", None) 

2340 if origin is typing.ClassVar: 

2341 return None 

2342 

2343 # check for other kind of ORM descriptor like AssociationProxy, 

2344 # don't raise for that (issue #9957) 

2345 elif isinstance(origin, type) and issubclass( 

2346 origin, ORMDescriptor 

2347 ): 

2348 return None 

2349 

2350 raise orm_exc.MappedAnnotationError( 

2351 f'Type annotation for "{cls.__name__}.{key}" ' 

2352 "can't be correctly interpreted for " 

2353 "Annotated Declarative Table form. ORM annotations " 

2354 "should normally make use of the ``Mapped[]`` generic " 

2355 "type, or other ORM-compatible generic type, as a " 

2356 "container for the actual type, which indicates the " 

2357 "intent that the attribute is mapped. " 

2358 "Class variables that are not intended to be mapped " 

2359 "by the ORM should use ClassVar[]. " 

2360 "To allow Annotated Declarative to disregard legacy " 

2361 "annotations which don't use Mapped[] to pass, set " 

2362 '"__allow_unmapped__ = True" on the class or a ' 

2363 "superclass this class.", 

2364 code="zlpr", 

2365 ) 

2366 

2367 else: 

2368 return annotated, None 

2369 

2370 generic_annotated = cast(GenericProtocol[Any], annotated) 

2371 if len(generic_annotated.__args__) != 1: 

2372 raise orm_exc.MappedAnnotationError( 

2373 "Expected sub-type for Mapped[] annotation" 

2374 ) 

2375 

2376 return ( 

2377 # fix dict/list/set args to be ForwardRef, see #11814 

2378 fixup_container_fwd_refs(generic_annotated.__args__[0]), 

2379 generic_annotated.__origin__, 

2380 ) 

2381 

2382 

2383def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2384 if hasattr(prop, "_mapper_property_name"): 

2385 name = prop._mapper_property_name() 

2386 else: 

2387 name = None 

2388 return util.clsname_as_plain_name(prop, name)