Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

832 statements  

1# orm/util.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import get_origin 

24from typing import Iterable 

25from typing import Iterator 

26from typing import List 

27from typing import Literal 

28from typing import Match 

29from typing import Optional 

30from typing import Protocol 

31from typing import Sequence 

32from typing import Tuple 

33from typing import Type 

34from typing import TYPE_CHECKING 

35from typing import TypeVar 

36from typing import Union 

37import weakref 

38 

39from . import attributes # noqa 

40from . import exc as orm_exc 

41from ._typing import _O 

42from ._typing import insp_is_aliased_class 

43from ._typing import insp_is_mapper 

44from ._typing import prop_is_relationship 

45from .base import _class_to_mapper as _class_to_mapper 

46from .base import _MappedAnnotationBase 

47from .base import _never_set as _never_set # noqa: F401 

48from .base import _none_only_set as _none_only_set # noqa: F401 

49from .base import _none_set as _none_set # noqa: F401 

50from .base import attribute_str as attribute_str # noqa: F401 

51from .base import class_mapper as class_mapper 

52from .base import DynamicMapped 

53from .base import InspectionAttr as InspectionAttr 

54from .base import instance_str as instance_str # noqa: F401 

55from .base import Mapped 

56from .base import object_mapper as object_mapper 

57from .base import object_state as object_state # noqa: F401 

58from .base import opt_manager_of_class 

59from .base import ORMDescriptor 

60from .base import state_attribute_str as state_attribute_str # noqa: F401 

61from .base import state_class_str as state_class_str # noqa: F401 

62from .base import state_str as state_str # noqa: F401 

63from .base import WriteOnlyMapped 

64from .interfaces import CriteriaOption 

65from .interfaces import MapperProperty as MapperProperty 

66from .interfaces import ORMColumnsClauseRole 

67from .interfaces import ORMEntityColumnsClauseRole 

68from .interfaces import ORMFromClauseRole 

69from .path_registry import PathRegistry as PathRegistry 

70from .. import event 

71from .. import exc as sa_exc 

72from .. import inspection 

73from .. import sql 

74from .. import util 

75from ..engine.result import result_tuple 

76from ..sql import coercions 

77from ..sql import expression 

78from ..sql import lambdas 

79from ..sql import roles 

80from ..sql import util as sql_util 

81from ..sql import visitors 

82from ..sql._typing import is_selectable 

83from ..sql.annotation import SupportsCloneAnnotations 

84from ..sql.base import WriteableColumnCollection 

85from ..sql.cache_key import HasCacheKey 

86from ..sql.cache_key import MemoizedHasCacheKey 

87from ..sql.elements import ColumnElement 

88from ..sql.elements import KeyedColumnElement 

89from ..sql.selectable import FromClause 

90from ..util.langhelpers import MemoizedSlots 

91from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

92from ..util.typing import eval_name_only as _eval_name_only 

93from ..util.typing import fixup_container_fwd_refs 

94from ..util.typing import GenericProtocol 

95from ..util.typing import is_origin_of_cls 

96from ..util.typing import TupleAny 

97from ..util.typing import Unpack 

98 

99if typing.TYPE_CHECKING: 

100 from ._typing import _EntityType 

101 from ._typing import _IdentityKeyType 

102 from ._typing import _InternalEntityType 

103 from ._typing import _ORMCOLEXPR 

104 from .context import _MapperEntity 

105 from .context import _ORMCompileState 

106 from .mapper import Mapper 

107 from .path_registry import _AbstractEntityRegistry 

108 from .query import Query 

109 from .relationships import RelationshipProperty 

110 from ..engine import Row 

111 from ..engine import RowMapping 

112 from ..sql._typing import _CE 

113 from ..sql._typing import _ColumnExpressionArgument 

114 from ..sql._typing import _EquivalentColumnMap 

115 from ..sql._typing import _FromClauseArgument 

116 from ..sql._typing import _OnClauseArgument 

117 from ..sql._typing import _PropagateAttrsType 

118 from ..sql.annotation import _SA 

119 from ..sql.base import ReadOnlyColumnCollection 

120 from ..sql.elements import BindParameter 

121 from ..sql.selectable import _ColumnsClauseElement 

122 from ..sql.selectable import Select 

123 from ..sql.selectable import Selectable 

124 from ..sql.visitors import anon_map 

125 from ..util.typing import _AnnotationScanType 

126 from ..util.typing import _MatchedOnType 

127 

128_T = TypeVar("_T", bound=Any) 

129 

130all_cascades = frozenset( 

131 ( 

132 "delete", 

133 "delete-orphan", 

134 "all", 

135 "merge", 

136 "expunge", 

137 "save-update", 

138 "refresh-expire", 

139 "none", 

140 ) 

141) 

142 

143_de_stringify_partial = functools.partial( 

144 functools.partial, 

145 locals_=util.immutabledict( 

146 { 

147 "Mapped": Mapped, 

148 "WriteOnlyMapped": WriteOnlyMapped, 

149 "DynamicMapped": DynamicMapped, 

150 } 

151 ), 

152) 

153 

154# partial is practically useless as we have to write out the whole 

155# function and maintain the signature anyway 

156 

157 

158class _DeStringifyAnnotation(Protocol): 

159 def __call__( 

160 self, 

161 cls: Type[Any], 

162 annotation: _AnnotationScanType, 

163 originating_module: str, 

164 *, 

165 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

166 include_generic: bool = False, 

167 ) -> _MatchedOnType: ... 

168 

169 

170de_stringify_annotation = cast( 

171 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

172) 

173 

174 

175class _EvalNameOnly(Protocol): 

176 def __call__(self, name: str, module_name: str) -> Any: ... 

177 

178 

179eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

180 

181 

182class CascadeOptions(FrozenSet[str]): 

183 """Keeps track of the options sent to 

184 :paramref:`.relationship.cascade`""" 

185 

186 _add_w_all_cascades = all_cascades.difference( 

187 ["all", "none", "delete-orphan"] 

188 ) 

189 _allowed_cascades = all_cascades 

190 

191 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

192 

193 __slots__ = ( 

194 "save_update", 

195 "delete", 

196 "refresh_expire", 

197 "merge", 

198 "expunge", 

199 "delete_orphan", 

200 ) 

201 

202 save_update: bool 

203 delete: bool 

204 refresh_expire: bool 

205 merge: bool 

206 expunge: bool 

207 delete_orphan: bool 

208 

209 def __new__( 

210 cls, value_list: Optional[Union[Iterable[str], str]] 

211 ) -> CascadeOptions: 

212 if isinstance(value_list, str) or value_list is None: 

213 return cls.from_string(value_list) # type: ignore 

214 values = set(value_list) 

215 if values.difference(cls._allowed_cascades): 

216 raise sa_exc.ArgumentError( 

217 "Invalid cascade option(s): %s" 

218 % ", ".join( 

219 [ 

220 repr(x) 

221 for x in sorted( 

222 values.difference(cls._allowed_cascades) 

223 ) 

224 ] 

225 ) 

226 ) 

227 

228 if "all" in values: 

229 values.update(cls._add_w_all_cascades) 

230 if "none" in values: 

231 values.clear() 

232 values.discard("all") 

233 

234 self = super().__new__(cls, values) 

235 self.save_update = "save-update" in values 

236 self.delete = "delete" in values 

237 self.refresh_expire = "refresh-expire" in values 

238 self.merge = "merge" in values 

239 self.expunge = "expunge" in values 

240 self.delete_orphan = "delete-orphan" in values 

241 

242 if self.delete_orphan and not self.delete: 

243 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

244 return self 

245 

246 def __repr__(self): 

247 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

248 

249 @classmethod 

250 def from_string(cls, arg): 

251 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

252 return cls(values) 

253 

254 

255def _validator_events(desc, key, validator, include_removes, include_backrefs): 

256 """Runs a validation method on an attribute value to be set or 

257 appended. 

258 """ 

259 

260 if not include_backrefs: 

261 

262 def detect_is_backref(state, initiator): 

263 impl = state.manager[key].impl 

264 return initiator.impl is not impl 

265 

266 if include_removes: 

267 

268 def append(state, value, initiator): 

269 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

270 include_backrefs or not detect_is_backref(state, initiator) 

271 ): 

272 return validator(state.obj(), key, value, False) 

273 else: 

274 return value 

275 

276 def bulk_set(state, values, initiator): 

277 if include_backrefs or not detect_is_backref(state, initiator): 

278 obj = state.obj() 

279 values[:] = [ 

280 validator(obj, key, value, False) for value in values 

281 ] 

282 

283 def set_(state, value, oldvalue, initiator): 

284 if include_backrefs or not detect_is_backref(state, initiator): 

285 return validator(state.obj(), key, value, False) 

286 else: 

287 return value 

288 

289 def remove(state, value, initiator): 

290 if include_backrefs or not detect_is_backref(state, initiator): 

291 validator(state.obj(), key, value, True) 

292 

293 else: 

294 

295 def append(state, value, initiator): 

296 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

297 include_backrefs or not detect_is_backref(state, initiator) 

298 ): 

299 return validator(state.obj(), key, value) 

300 else: 

301 return value 

302 

303 def bulk_set(state, values, initiator): 

304 if include_backrefs or not detect_is_backref(state, initiator): 

305 obj = state.obj() 

306 values[:] = [validator(obj, key, value) for value in values] 

307 

308 def set_(state, value, oldvalue, initiator): 

309 if include_backrefs or not detect_is_backref(state, initiator): 

310 return validator(state.obj(), key, value) 

311 else: 

312 return value 

313 

314 event.listen(desc, "append", append, raw=True, retval=True) 

315 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

316 event.listen(desc, "set", set_, raw=True, retval=True) 

317 if include_removes: 

318 event.listen(desc, "remove", remove, raw=True, retval=True) 

319 

320 

321def polymorphic_union( 

322 table_map, typecolname, aliasname="p_union", cast_nulls=True 

323): 

324 """Create a ``UNION`` statement used by a polymorphic mapper. 

325 

326 See :ref:`concrete_inheritance` for an example of how 

327 this is used. 

328 

329 :param table_map: mapping of polymorphic identities to 

330 :class:`_schema.Table` objects. 

331 :param typecolname: string name of a "discriminator" column, which will be 

332 derived from the query, producing the polymorphic identity for 

333 each row. If ``None``, no polymorphic discriminator is generated. 

334 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

335 construct generated. 

336 :param cast_nulls: if True, non-existent columns, which are represented 

337 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

338 that is problematic on some backends such as Oracle - in which case it 

339 can be set to False. 

340 

341 """ 

342 

343 colnames: util.OrderedSet[str] = util.OrderedSet() 

344 colnamemaps = {} 

345 types = {} 

346 for key in table_map: 

347 table = table_map[key] 

348 

349 table = coercions.expect(roles.FromClauseRole, table) 

350 table_map[key] = table 

351 

352 m = {} 

353 for c in table.c: 

354 if c.key == typecolname: 

355 raise sa_exc.InvalidRequestError( 

356 "Polymorphic union can't use '%s' as the discriminator " 

357 "column due to mapped column %r; please apply the " 

358 "'typecolname' " 

359 "argument; this is available on " 

360 "ConcreteBase as '_concrete_discriminator_name'" 

361 % (typecolname, c) 

362 ) 

363 colnames.add(c.key) 

364 m[c.key] = c 

365 types[c.key] = c.type 

366 colnamemaps[table] = m 

367 

368 def col(name, table): 

369 try: 

370 return colnamemaps[table][name] 

371 except KeyError: 

372 if cast_nulls: 

373 return sql.cast(sql.null(), types[name]).label(name) 

374 else: 

375 return sql.type_coerce(sql.null(), types[name]).label(name) 

376 

377 result = [] 

378 for type_, table in table_map.items(): 

379 if typecolname is not None: 

380 result.append( 

381 sql.select( 

382 *( 

383 [col(name, table) for name in colnames] 

384 + [ 

385 sql.literal_column( 

386 sql_util._quote_ddl_expr(type_) 

387 ).label(typecolname) 

388 ] 

389 ) 

390 ).select_from(table) 

391 ) 

392 else: 

393 result.append( 

394 sql.select( 

395 *[col(name, table) for name in colnames] 

396 ).select_from(table) 

397 ) 

398 return sql.union_all(*result).alias(aliasname) 

399 

400 

401def identity_key( 

402 class_: Optional[Type[_T]] = None, 

403 ident: Union[Any, Tuple[Any, ...]] = None, 

404 *, 

405 instance: Optional[_T] = None, 

406 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

407 identity_token: Optional[Any] = None, 

408) -> _IdentityKeyType[_T]: 

409 r"""Generate "identity key" tuples, as are used as keys in the 

410 :attr:`.Session.identity_map` dictionary. 

411 

412 This function has several call styles: 

413 

414 * ``identity_key(class, ident, identity_token=token)`` 

415 

416 This form receives a mapped class and a primary key scalar or 

417 tuple as an argument. 

418 

419 E.g.:: 

420 

421 >>> identity_key(MyClass, (1, 2)) 

422 (<class '__main__.MyClass'>, (1, 2), None) 

423 

424 :param class: mapped class (must be a positional argument) 

425 :param ident: primary key, may be a scalar or tuple argument. 

426 :param identity_token: optional identity token 

427 

428 * ``identity_key(instance=instance)`` 

429 

430 This form will produce the identity key for a given instance. The 

431 instance need not be persistent, only that its primary key attributes 

432 are populated (else the key will contain ``None`` for those missing 

433 values). 

434 

435 E.g.:: 

436 

437 >>> instance = MyClass(1, 2) 

438 >>> identity_key(instance=instance) 

439 (<class '__main__.MyClass'>, (1, 2), None) 

440 

441 In this form, the given instance is ultimately run though 

442 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

443 effect of performing a database check for the corresponding row 

444 if the object is expired. 

445 

446 :param instance: object instance (must be given as a keyword arg) 

447 

448 * ``identity_key(class, row=row, identity_token=token)`` 

449 

450 This form is similar to the class/tuple form, except is passed a 

451 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

452 

453 E.g.:: 

454 

455 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

456 >>> identity_key(MyClass, row=row) 

457 (<class '__main__.MyClass'>, (1, 2), None) 

458 

459 :param class: mapped class (must be a positional argument) 

460 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

461 (must be given as a keyword arg) 

462 :param identity_token: optional identity token 

463 

464 """ # noqa: E501 

465 if class_ is not None: 

466 mapper = class_mapper(class_) 

467 if row is None: 

468 if ident is None: 

469 raise sa_exc.ArgumentError("ident or row is required") 

470 return mapper.identity_key_from_primary_key( 

471 tuple(util.to_list(ident)), identity_token=identity_token 

472 ) 

473 else: 

474 return mapper.identity_key_from_row( 

475 row, identity_token=identity_token 

476 ) 

477 elif instance is not None: 

478 mapper = object_mapper(instance) 

479 return mapper.identity_key_from_instance(instance) 

480 else: 

481 raise sa_exc.ArgumentError("class or instance is required") 

482 

483 

484class _TraceAdaptRole(enum.Enum): 

485 """Enumeration of all the use cases for ORMAdapter. 

486 

487 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

488 used for in-place adaption of column expressions to be applied to a SELECT, 

489 replacing :class:`.Table` and other objects that are mapped to classes with 

490 aliases of those tables in the case of joined eager loading, or in the case 

491 of polymorphic loading as used with concrete mappings or other custom "with 

492 polymorphic" parameters, with whole user-defined subqueries. The 

493 enumerations provide an overview of all the use cases used by ORMAdapter, a 

494 layer of formality as to the introduction of new ORMAdapter use cases (of 

495 which none are anticipated), as well as a means to trace the origins of a 

496 particular ORMAdapter within runtime debugging. 

497 

498 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

499 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

500 method and the ``Query.select_from_entity()`` methods, favoring 

501 user-explicit aliasing schemes using the ``aliased()`` and 

502 ``with_polymorphic()`` standalone constructs; these still use adaption, 

503 however the adaption is applied in a narrower scope. 

504 

505 """ 

506 

507 # aliased() use that is used to adapt individual attributes at query 

508 # construction time 

509 ALIASED_INSP = enum.auto() 

510 

511 # joinedload cases; typically adapt an ON clause of a relationship 

512 # join 

513 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

514 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

515 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

516 

517 # polymorphic cases - these are complex ones that replace FROM 

518 # clauses, replacing tables with subqueries 

519 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

520 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

521 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

522 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

523 

524 # the from_statement() case, used only to adapt individual attributes 

525 # from a given statement to local ORM attributes at result fetching 

526 # time. assigned to ORMCompileState._from_obj_alias 

527 ADAPT_FROM_STATEMENT = enum.auto() 

528 

529 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

530 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

531 # joinedloads are then placed on the outside. 

532 # assigned to ORMCompileState.compound_eager_adapter 

533 COMPOUND_EAGER_STATEMENT = enum.auto() 

534 

535 # the legacy Query._set_select_from() case. 

536 # this is needed for Query's set operations (i.e. UNION, etc. ) 

537 # as well as "legacy from_self()", which while removed from 2.0 as 

538 # public API, is used for the Query.count() method. this one 

539 # still does full statement traversal 

540 # assigned to ORMCompileState._from_obj_alias 

541 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

542 

543 

544class ORMStatementAdapter(sql_util.ColumnAdapter): 

545 """ColumnAdapter which includes a role attribute.""" 

546 

547 __slots__ = ("role",) 

548 

549 def __init__( 

550 self, 

551 role: _TraceAdaptRole, 

552 selectable: Selectable, 

553 *, 

554 equivalents: Optional[_EquivalentColumnMap] = None, 

555 adapt_required: bool = False, 

556 allow_label_resolve: bool = True, 

557 anonymize_labels: bool = False, 

558 adapt_on_names: bool = False, 

559 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

560 ): 

561 self.role = role 

562 super().__init__( 

563 selectable, 

564 equivalents=equivalents, 

565 adapt_required=adapt_required, 

566 allow_label_resolve=allow_label_resolve, 

567 anonymize_labels=anonymize_labels, 

568 adapt_on_names=adapt_on_names, 

569 adapt_from_selectables=adapt_from_selectables, 

570 ) 

571 

572 

573class ORMAdapter(sql_util.ColumnAdapter): 

574 """ColumnAdapter subclass which excludes adaptation of entities from 

575 non-matching mappers. 

576 

577 """ 

578 

579 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

580 

581 is_aliased_class: bool 

582 aliased_insp: Optional[AliasedInsp[Any]] 

583 

584 def __init__( 

585 self, 

586 role: _TraceAdaptRole, 

587 entity: _InternalEntityType[Any], 

588 *, 

589 equivalents: Optional[_EquivalentColumnMap] = None, 

590 adapt_required: bool = False, 

591 allow_label_resolve: bool = True, 

592 anonymize_labels: bool = False, 

593 selectable: Optional[Selectable] = None, 

594 limit_on_entity: bool = True, 

595 adapt_on_names: bool = False, 

596 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

597 ): 

598 self.role = role 

599 self.mapper = entity.mapper 

600 if selectable is None: 

601 selectable = entity.selectable 

602 if insp_is_aliased_class(entity): 

603 self.is_aliased_class = True 

604 self.aliased_insp = entity 

605 else: 

606 self.is_aliased_class = False 

607 self.aliased_insp = None 

608 

609 super().__init__( 

610 selectable, 

611 equivalents, 

612 adapt_required=adapt_required, 

613 allow_label_resolve=allow_label_resolve, 

614 anonymize_labels=anonymize_labels, 

615 include_fn=self._include_fn if limit_on_entity else None, 

616 adapt_on_names=adapt_on_names, 

617 adapt_from_selectables=adapt_from_selectables, 

618 ) 

619 

620 def _include_fn(self, elem): 

621 entity = elem._annotations.get("parentmapper", None) 

622 

623 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

624 

625 

626class AliasedClass( 

627 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

628): 

629 r"""Represents an "aliased" form of a mapped class for usage with Query. 

630 

631 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

632 construct, this object mimics the mapped class using a 

633 ``__getattr__`` scheme and maintains a reference to a 

634 real :class:`~sqlalchemy.sql.expression.Alias` object. 

635 

636 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

637 within a SQL statement generated by the ORM, such that an existing 

638 mapped entity can be used in multiple contexts. A simple example:: 

639 

640 # find all pairs of users with the same name 

641 user_alias = aliased(User) 

642 session.query(User, user_alias).join( 

643 (user_alias, User.id > user_alias.id) 

644 ).filter(User.name == user_alias.name) 

645 

646 :class:`.AliasedClass` is also capable of mapping an existing mapped 

647 class to an entirely new selectable, provided this selectable is column- 

648 compatible with the existing mapped selectable, and it can also be 

649 configured in a mapping as the target of a :func:`_orm.relationship`. 

650 See the links below for examples. 

651 

652 The :class:`.AliasedClass` object is constructed typically using the 

653 :func:`_orm.aliased` function. It also is produced with additional 

654 configuration when using the :func:`_orm.with_polymorphic` function. 

655 

656 The resulting object is an instance of :class:`.AliasedClass`. 

657 This object implements an attribute scheme which produces the 

658 same attribute and method interface as the original mapped 

659 class, allowing :class:`.AliasedClass` to be compatible 

660 with any attribute technique which works on the original class, 

661 including hybrid attributes (see :ref:`hybrids_toplevel`). 

662 

663 The :class:`.AliasedClass` can be inspected for its underlying 

664 :class:`_orm.Mapper`, aliased selectable, and other information 

665 using :func:`_sa.inspect`:: 

666 

667 from sqlalchemy import inspect 

668 

669 my_alias = aliased(MyClass) 

670 insp = inspect(my_alias) 

671 

672 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

673 

674 

675 .. seealso:: 

676 

677 :func:`.aliased` 

678 

679 :func:`.with_polymorphic` 

680 

681 :ref:`relationship_aliased_class` 

682 

683 :ref:`relationship_to_window_function` 

684 

685 

686 """ 

687 

688 __name__: str 

689 

690 def __init__( 

691 self, 

692 mapped_class_or_ac: _EntityType[_O], 

693 alias: Optional[FromClause] = None, 

694 name: Optional[str] = None, 

695 flat: bool = False, 

696 adapt_on_names: bool = False, 

697 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

698 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

699 base_alias: Optional[AliasedInsp[Any]] = None, 

700 use_mapper_path: bool = False, 

701 represents_outer_join: bool = False, 

702 ): 

703 insp = cast( 

704 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

705 ) 

706 mapper = insp.mapper 

707 

708 nest_adapters = False 

709 

710 if alias is None: 

711 if insp.is_aliased_class and insp.selectable._is_subquery: 

712 alias = insp.selectable.alias() 

713 else: 

714 alias = ( 

715 mapper._with_polymorphic_selectable._anonymous_fromclause( 

716 name=name, 

717 flat=flat, 

718 ) 

719 ) 

720 elif insp.is_aliased_class: 

721 nest_adapters = True 

722 

723 assert alias is not None 

724 self._aliased_insp = AliasedInsp( 

725 self, 

726 insp, 

727 alias, 

728 name, 

729 ( 

730 with_polymorphic_mappers 

731 if with_polymorphic_mappers 

732 else mapper.with_polymorphic_mappers 

733 ), 

734 ( 

735 with_polymorphic_discriminator 

736 if with_polymorphic_discriminator is not None 

737 else mapper.polymorphic_on 

738 ), 

739 base_alias, 

740 use_mapper_path, 

741 adapt_on_names, 

742 represents_outer_join, 

743 nest_adapters, 

744 ) 

745 

746 self.__name__ = f"aliased({mapper.class_.__name__})" 

747 

748 @classmethod 

749 def _reconstitute_from_aliased_insp( 

750 cls, aliased_insp: AliasedInsp[_O] 

751 ) -> AliasedClass[_O]: 

752 obj = cls.__new__(cls) 

753 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

754 obj._aliased_insp = aliased_insp 

755 

756 if aliased_insp._is_with_polymorphic: 

757 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

758 if sub_aliased_insp is not aliased_insp: 

759 ent = AliasedClass._reconstitute_from_aliased_insp( 

760 sub_aliased_insp 

761 ) 

762 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

763 

764 return obj 

765 

766 def __getattr__(self, key: str) -> Any: 

767 try: 

768 _aliased_insp = self.__dict__["_aliased_insp"] 

769 except KeyError: 

770 raise AttributeError() 

771 else: 

772 target = _aliased_insp._target 

773 # maintain all getattr mechanics 

774 attr = getattr(target, key) 

775 

776 # attribute is a method, that will be invoked against a 

777 # "self"; so just return a new method with the same function and 

778 # new self 

779 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

780 return types.MethodType(attr.__func__, self) 

781 

782 # attribute is a descriptor, that will be invoked against a 

783 # "self"; so invoke the descriptor against this self 

784 if hasattr(attr, "__get__"): 

785 attr = attr.__get__(None, self) 

786 

787 # attributes within the QueryableAttribute system will want this 

788 # to be invoked so the object can be adapted 

789 if hasattr(attr, "adapt_to_entity"): 

790 attr = attr.adapt_to_entity(_aliased_insp) 

791 setattr(self, key, attr) 

792 

793 return attr 

794 

795 def _get_from_serialized( 

796 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

797 ) -> Any: 

798 # this method is only used in terms of the 

799 # sqlalchemy.ext.serializer extension 

800 attr = getattr(mapped_class, key) 

801 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

802 return types.MethodType(attr.__func__, self) 

803 

804 # attribute is a descriptor, that will be invoked against a 

805 # "self"; so invoke the descriptor against this self 

806 if hasattr(attr, "__get__"): 

807 attr = attr.__get__(None, self) 

808 

809 # attributes within the QueryableAttribute system will want this 

810 # to be invoked so the object can be adapted 

811 if hasattr(attr, "adapt_to_entity"): 

812 aliased_insp._weak_entity = weakref.ref(self) 

813 attr = attr.adapt_to_entity(aliased_insp) 

814 setattr(self, key, attr) 

815 

816 return attr 

817 

818 def __repr__(self) -> str: 

819 return "<AliasedClass at 0x%x; %s>" % ( 

820 id(self), 

821 self._aliased_insp._target.__name__, 

822 ) 

823 

824 def __str__(self) -> str: 

825 return str(self._aliased_insp) 

826 

827 

828@inspection._self_inspects 

829class AliasedInsp( 

830 ORMEntityColumnsClauseRole[_O], 

831 ORMFromClauseRole, 

832 HasCacheKey, 

833 InspectionAttr, 

834 MemoizedSlots, 

835 inspection.Inspectable["AliasedInsp[_O]"], 

836 Generic[_O], 

837): 

838 """Provide an inspection interface for an 

839 :class:`.AliasedClass` object. 

840 

841 The :class:`.AliasedInsp` object is returned 

842 given an :class:`.AliasedClass` using the 

843 :func:`_sa.inspect` function:: 

844 

845 from sqlalchemy import inspect 

846 from sqlalchemy.orm import aliased 

847 

848 my_alias = aliased(MyMappedClass) 

849 insp = inspect(my_alias) 

850 

851 Attributes on :class:`.AliasedInsp` 

852 include: 

853 

854 * ``entity`` - the :class:`.AliasedClass` represented. 

855 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

856 * ``selectable`` - the :class:`_expression.Alias` 

857 construct which ultimately 

858 represents an aliased :class:`_schema.Table` or 

859 :class:`_expression.Select` 

860 construct. 

861 * ``name`` - the name of the alias. Also is used as the attribute 

862 name when returned in a result tuple from :class:`_query.Query`. 

863 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

864 objects 

865 indicating all those mappers expressed in the select construct 

866 for the :class:`.AliasedClass`. 

867 * ``polymorphic_on`` - an alternate column or SQL expression which 

868 will be used as the "discriminator" for a polymorphic load. 

869 

870 .. seealso:: 

871 

872 :ref:`inspection_toplevel` 

873 

874 """ 

875 

876 __slots__ = ( 

877 "__weakref__", 

878 "_weak_entity", 

879 "mapper", 

880 "selectable", 

881 "name", 

882 "_adapt_on_names", 

883 "with_polymorphic_mappers", 

884 "polymorphic_on", 

885 "_use_mapper_path", 

886 "_base_alias", 

887 "represents_outer_join", 

888 "persist_selectable", 

889 "local_table", 

890 "_is_with_polymorphic", 

891 "_with_polymorphic_entities", 

892 "_adapter", 

893 "_target", 

894 "__clause_element__", 

895 "_memoized_values", 

896 "_all_column_expressions", 

897 "_nest_adapters", 

898 ) 

899 

900 _cache_key_traversal = [ 

901 ("name", visitors.ExtendedInternalTraversal.dp_string), 

902 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

903 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

904 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

905 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

906 ( 

907 "with_polymorphic_mappers", 

908 visitors.InternalTraversal.dp_has_cache_key_list, 

909 ), 

910 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

911 ] 

912 

913 mapper: Mapper[_O] 

914 selectable: FromClause 

915 _adapter: ORMAdapter 

916 with_polymorphic_mappers: Sequence[Mapper[Any]] 

917 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

918 

919 _weak_entity: weakref.ref[AliasedClass[_O]] 

920 """the AliasedClass that refers to this AliasedInsp""" 

921 

922 _target: Union[Type[_O], AliasedClass[_O]] 

923 """the thing referenced by the AliasedClass/AliasedInsp. 

924 

925 In the vast majority of cases, this is the mapped class. However 

926 it may also be another AliasedClass (alias of alias). 

927 

928 """ 

929 

930 def __init__( 

931 self, 

932 entity: AliasedClass[_O], 

933 inspected: _InternalEntityType[_O], 

934 selectable: FromClause, 

935 name: Optional[str], 

936 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

937 polymorphic_on: Optional[ColumnElement[Any]], 

938 _base_alias: Optional[AliasedInsp[Any]], 

939 _use_mapper_path: bool, 

940 adapt_on_names: bool, 

941 represents_outer_join: bool, 

942 nest_adapters: bool, 

943 ): 

944 mapped_class_or_ac = inspected.entity 

945 mapper = inspected.mapper 

946 

947 self._weak_entity = weakref.ref(entity) 

948 self.mapper = mapper 

949 self.selectable = self.persist_selectable = self.local_table = ( 

950 selectable 

951 ) 

952 self.name = name 

953 self.polymorphic_on = polymorphic_on 

954 self._base_alias = weakref.ref(_base_alias or self) 

955 self._use_mapper_path = _use_mapper_path 

956 self.represents_outer_join = represents_outer_join 

957 self._nest_adapters = nest_adapters 

958 

959 if with_polymorphic_mappers: 

960 self._is_with_polymorphic = True 

961 self.with_polymorphic_mappers = with_polymorphic_mappers 

962 self._with_polymorphic_entities = [] 

963 for poly in self.with_polymorphic_mappers: 

964 if poly is not mapper: 

965 ent = AliasedClass( 

966 poly.class_, 

967 selectable, 

968 base_alias=self, 

969 adapt_on_names=adapt_on_names, 

970 use_mapper_path=_use_mapper_path, 

971 ) 

972 

973 setattr(self.entity, poly.class_.__name__, ent) 

974 self._with_polymorphic_entities.append(ent._aliased_insp) 

975 

976 else: 

977 self._is_with_polymorphic = False 

978 self.with_polymorphic_mappers = [mapper] 

979 

980 self._adapter = ORMAdapter( 

981 _TraceAdaptRole.ALIASED_INSP, 

982 mapper, 

983 selectable=selectable, 

984 equivalents=mapper._equivalent_columns, 

985 adapt_on_names=adapt_on_names, 

986 anonymize_labels=True, 

987 # make sure the adapter doesn't try to grab other tables that 

988 # are not even the thing we are mapping, such as embedded 

989 # selectables in subqueries or CTEs. See issue #6060 

990 adapt_from_selectables={ 

991 m.selectable 

992 for m in self.with_polymorphic_mappers 

993 if not adapt_on_names 

994 }, 

995 limit_on_entity=False, 

996 ) 

997 

998 if nest_adapters: 

999 # supports "aliased class of aliased class" use case 

1000 assert isinstance(inspected, AliasedInsp) 

1001 self._adapter = inspected._adapter.wrap(self._adapter) 

1002 

1003 self._adapt_on_names = adapt_on_names 

1004 self._target = mapped_class_or_ac 

1005 

1006 @classmethod 

1007 def _alias_factory( 

1008 cls, 

1009 element: Union[_EntityType[_O], FromClause], 

1010 alias: Optional[FromClause] = None, 

1011 name: Optional[str] = None, 

1012 flat: bool = False, 

1013 adapt_on_names: bool = False, 

1014 ) -> Union[AliasedClass[_O], FromClause]: 

1015 if isinstance(element, FromClause): 

1016 if adapt_on_names: 

1017 raise sa_exc.ArgumentError( 

1018 "adapt_on_names only applies to ORM elements" 

1019 ) 

1020 if name: 

1021 return element.alias(name=name, flat=flat) 

1022 else: 

1023 # see selectable.py->Alias._factory() for similar 

1024 # mypy issue. Cannot get the overload to see this 

1025 # in mypy (works fine in pyright) 

1026 return coercions.expect( # type: ignore[no-any-return] 

1027 roles.AnonymizedFromClauseRole, element, flat=flat 

1028 ) 

1029 else: 

1030 return AliasedClass( 

1031 element, 

1032 alias=alias, 

1033 flat=flat, 

1034 name=name, 

1035 adapt_on_names=adapt_on_names, 

1036 ) 

1037 

1038 @classmethod 

1039 def _with_polymorphic_factory( 

1040 cls, 

1041 base: Union[Type[_O], Mapper[_O]], 

1042 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1043 selectable: Union[Literal[False, None], FromClause] = False, 

1044 flat: bool = False, 

1045 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1046 aliased: bool = False, 

1047 innerjoin: bool = False, 

1048 adapt_on_names: bool = False, 

1049 name: Optional[str] = None, 

1050 _use_mapper_path: bool = False, 

1051 ) -> AliasedClass[_O]: 

1052 primary_mapper = _class_to_mapper(base) 

1053 

1054 if selectable not in (None, False) and flat: 

1055 raise sa_exc.ArgumentError( 

1056 "the 'flat' and 'selectable' arguments cannot be passed " 

1057 "simultaneously to with_polymorphic()" 

1058 ) 

1059 

1060 mappers, selectable = primary_mapper._with_polymorphic_args( 

1061 classes, selectable, innerjoin=innerjoin 

1062 ) 

1063 if aliased or flat: 

1064 assert selectable is not None 

1065 selectable = selectable._anonymous_fromclause(flat=flat) 

1066 

1067 return AliasedClass( 

1068 base, 

1069 selectable, 

1070 name=name, 

1071 with_polymorphic_mappers=mappers, 

1072 adapt_on_names=adapt_on_names, 

1073 with_polymorphic_discriminator=polymorphic_on, 

1074 use_mapper_path=_use_mapper_path, 

1075 represents_outer_join=not innerjoin, 

1076 ) 

1077 

1078 @property 

1079 def entity(self) -> AliasedClass[_O]: 

1080 # to eliminate reference cycles, the AliasedClass is held weakly. 

1081 # this produces some situations where the AliasedClass gets lost, 

1082 # particularly when one is created internally and only the AliasedInsp 

1083 # is passed around. 

1084 # to work around this case, we just generate a new one when we need 

1085 # it, as it is a simple class with very little initial state on it. 

1086 ent = self._weak_entity() 

1087 if ent is None: 

1088 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1089 self._weak_entity = weakref.ref(ent) 

1090 return ent 

1091 

1092 is_aliased_class = True 

1093 "always returns True" 

1094 

1095 def _memoized_method___clause_element__(self) -> FromClause: 

1096 return self.selectable._annotate( 

1097 { 

1098 "parentmapper": self.mapper, 

1099 "parententity": self, 

1100 "entity_namespace": self, 

1101 } 

1102 )._set_propagate_attrs( 

1103 {"compile_state_plugin": "orm", "plugin_subject": self} 

1104 ) 

1105 

1106 @property 

1107 def entity_namespace(self) -> AliasedClass[_O]: 

1108 return self.entity 

1109 

1110 @property 

1111 def class_(self) -> Type[_O]: 

1112 """Return the mapped class ultimately represented by this 

1113 :class:`.AliasedInsp`.""" 

1114 return self.mapper.class_ 

1115 

1116 @property 

1117 def _path_registry(self) -> _AbstractEntityRegistry: 

1118 if self._use_mapper_path: 

1119 return self.mapper._path_registry 

1120 else: 

1121 return PathRegistry.per_mapper(self) 

1122 

1123 def __getstate__(self) -> Dict[str, Any]: 

1124 return { 

1125 "entity": self.entity, 

1126 "mapper": self.mapper, 

1127 "alias": self.selectable, 

1128 "name": self.name, 

1129 "adapt_on_names": self._adapt_on_names, 

1130 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1131 "with_polymorphic_discriminator": self.polymorphic_on, 

1132 "base_alias": self._base_alias(), 

1133 "use_mapper_path": self._use_mapper_path, 

1134 "represents_outer_join": self.represents_outer_join, 

1135 "nest_adapters": self._nest_adapters, 

1136 } 

1137 

1138 def __setstate__(self, state: Dict[str, Any]) -> None: 

1139 self.__init__( # type: ignore 

1140 state["entity"], 

1141 state["mapper"], 

1142 state["alias"], 

1143 state["name"], 

1144 state["with_polymorphic_mappers"], 

1145 state["with_polymorphic_discriminator"], 

1146 state["base_alias"], 

1147 state["use_mapper_path"], 

1148 state["adapt_on_names"], 

1149 state["represents_outer_join"], 

1150 state["nest_adapters"], 

1151 ) 

1152 

1153 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1154 # assert self._is_with_polymorphic 

1155 # assert other._is_with_polymorphic 

1156 

1157 primary_mapper = other.mapper 

1158 

1159 assert self.mapper is primary_mapper 

1160 

1161 our_classes = util.to_set( 

1162 mp.class_ for mp in self.with_polymorphic_mappers 

1163 ) 

1164 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1165 if our_classes == new_classes: 

1166 return other 

1167 else: 

1168 classes = our_classes.union(new_classes) 

1169 

1170 mappers, selectable = primary_mapper._with_polymorphic_args( 

1171 classes, None, innerjoin=not other.represents_outer_join 

1172 ) 

1173 selectable = selectable._anonymous_fromclause(flat=True) 

1174 return AliasedClass( 

1175 primary_mapper, 

1176 selectable, 

1177 with_polymorphic_mappers=mappers, 

1178 with_polymorphic_discriminator=other.polymorphic_on, 

1179 use_mapper_path=other._use_mapper_path, 

1180 represents_outer_join=other.represents_outer_join, 

1181 )._aliased_insp 

1182 

1183 def _adapt_element( 

1184 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1185 ) -> _ORMCOLEXPR: 

1186 assert isinstance(expr, ColumnElement) 

1187 d: Dict[str, Any] = { 

1188 "parententity": self, 

1189 "parentmapper": self.mapper, 

1190 } 

1191 if key: 

1192 d["proxy_key"] = key 

1193 

1194 # userspace adapt of an attribute from AliasedClass; validate that 

1195 # it actually was present 

1196 adapted = self._adapter.adapt_check_present(expr) 

1197 if adapted is None: 

1198 adapted = expr 

1199 if self._adapter.adapt_on_names: 

1200 util.warn_limited( 

1201 "Did not locate an expression in selectable for " 

1202 "attribute %r; ensure name is correct in expression", 

1203 (key,), 

1204 ) 

1205 else: 

1206 util.warn_limited( 

1207 "Did not locate an expression in selectable for " 

1208 "attribute %r; to match by name, use the " 

1209 "adapt_on_names parameter", 

1210 (key,), 

1211 ) 

1212 

1213 return adapted._annotate(d)._set_propagate_attrs( 

1214 {"compile_state_plugin": "orm", "plugin_subject": self} 

1215 ) 

1216 

1217 if TYPE_CHECKING: 

1218 # establish compatibility with the _ORMAdapterProto protocol, 

1219 # which in turn is compatible with _CoreAdapterProto. 

1220 

1221 def _orm_adapt_element( 

1222 self, 

1223 obj: _CE, 

1224 key: Optional[str] = None, 

1225 ) -> _CE: ... 

1226 

1227 else: 

1228 _orm_adapt_element = _adapt_element 

1229 

1230 def _entity_for_mapper(self, mapper): 

1231 self_poly = self.with_polymorphic_mappers 

1232 if mapper in self_poly: 

1233 if mapper is self.mapper: 

1234 return self 

1235 else: 

1236 return getattr( 

1237 self.entity, mapper.class_.__name__ 

1238 )._aliased_insp 

1239 elif mapper.isa(self.mapper): 

1240 return self 

1241 else: 

1242 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1243 

1244 def _memoized_attr__get_clause(self): 

1245 onclause, replacemap = self.mapper._get_clause 

1246 return ( 

1247 self._adapter.traverse(onclause), 

1248 { 

1249 self._adapter.traverse(col): param 

1250 for col, param in replacemap.items() 

1251 }, 

1252 ) 

1253 

1254 def _memoized_attr__memoized_values(self): 

1255 return {} 

1256 

1257 def _memoized_attr__all_column_expressions(self): 

1258 if self._is_with_polymorphic: 

1259 cols_plus_keys = self.mapper._columns_plus_keys( 

1260 [ent.mapper for ent in self._with_polymorphic_entities] 

1261 ) 

1262 else: 

1263 cols_plus_keys = self.mapper._columns_plus_keys() 

1264 

1265 cols_plus_keys = [ 

1266 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1267 ] 

1268 

1269 return WriteableColumnCollection(cols_plus_keys) 

1270 

1271 def _memo(self, key, callable_, *args, **kw): 

1272 if key in self._memoized_values: 

1273 return self._memoized_values[key] 

1274 else: 

1275 self._memoized_values[key] = value = callable_(*args, **kw) 

1276 return value 

1277 

1278 def __repr__(self): 

1279 if self.with_polymorphic_mappers: 

1280 with_poly = "(%s)" % ", ".join( 

1281 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1282 ) 

1283 else: 

1284 with_poly = "" 

1285 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1286 id(self), 

1287 self.class_.__name__, 

1288 with_poly, 

1289 ) 

1290 

1291 def __str__(self): 

1292 if self._is_with_polymorphic: 

1293 return "with_polymorphic(%s, [%s])" % ( 

1294 self._target.__name__, 

1295 ", ".join( 

1296 mp.class_.__name__ 

1297 for mp in self.with_polymorphic_mappers 

1298 if mp is not self.mapper 

1299 ), 

1300 ) 

1301 else: 

1302 return "aliased(%s)" % (self._target.__name__,) 

1303 

1304 

1305class _WrapUserEntity: 

1306 """A wrapper used within the loader_criteria lambda caller so that 

1307 we can bypass declared_attr descriptors on unmapped mixins, which 

1308 normally emit a warning for such use. 

1309 

1310 might also be useful for other per-lambda instrumentations should 

1311 the need arise. 

1312 

1313 """ 

1314 

1315 __slots__ = ("subject",) 

1316 

1317 def __init__(self, subject): 

1318 self.subject = subject 

1319 

1320 @util.preload_module("sqlalchemy.orm.decl_api") 

1321 def __getattribute__(self, name): 

1322 decl_api = util.preloaded.orm.decl_api 

1323 

1324 subject = object.__getattribute__(self, "subject") 

1325 if name in subject.__dict__ and isinstance( 

1326 subject.__dict__[name], decl_api.declared_attr 

1327 ): 

1328 return subject.__dict__[name].fget(subject) 

1329 else: 

1330 return getattr(subject, name) 

1331 

1332 

1333class LoaderCriteriaOption(CriteriaOption): 

1334 """Add additional WHERE criteria to the load for all occurrences of 

1335 a particular entity. 

1336 

1337 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1338 :func:`_orm.with_loader_criteria` function; see that function for 

1339 details. 

1340 

1341 .. versionadded:: 1.4 

1342 

1343 """ 

1344 

1345 __slots__ = ( 

1346 "root_entity", 

1347 "entity", 

1348 "deferred_where_criteria", 

1349 "where_criteria", 

1350 "_where_crit_orig", 

1351 "include_aliases", 

1352 "propagate_to_loaders", 

1353 ) 

1354 

1355 _traverse_internals = [ 

1356 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1357 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1358 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1359 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1360 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1361 ] 

1362 

1363 root_entity: Optional[Type[Any]] 

1364 entity: Optional[_InternalEntityType[Any]] 

1365 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1366 deferred_where_criteria: bool 

1367 include_aliases: bool 

1368 propagate_to_loaders: bool 

1369 

1370 _where_crit_orig: Any 

1371 

1372 def __init__( 

1373 self, 

1374 entity_or_base: _EntityType[Any], 

1375 where_criteria: Union[ 

1376 _ColumnExpressionArgument[bool], 

1377 Callable[[Any], _ColumnExpressionArgument[bool]], 

1378 ], 

1379 loader_only: bool = False, 

1380 include_aliases: bool = False, 

1381 propagate_to_loaders: bool = True, 

1382 track_closure_variables: bool = True, 

1383 ): 

1384 entity = cast( 

1385 "_InternalEntityType[Any]", 

1386 inspection.inspect(entity_or_base, False), 

1387 ) 

1388 if entity is None: 

1389 self.root_entity = cast("Type[Any]", entity_or_base) 

1390 self.entity = None 

1391 else: 

1392 self.root_entity = None 

1393 self.entity = entity 

1394 

1395 self._where_crit_orig = where_criteria 

1396 if callable(where_criteria): 

1397 if self.root_entity is not None: 

1398 wrap_entity = self.root_entity 

1399 else: 

1400 assert entity is not None 

1401 wrap_entity = entity.entity 

1402 

1403 self.deferred_where_criteria = True 

1404 self.where_criteria = lambdas.DeferredLambdaElement( 

1405 where_criteria, 

1406 roles.WhereHavingRole, 

1407 lambda_args=(_WrapUserEntity(wrap_entity),), 

1408 opts=lambdas.LambdaOptions( 

1409 track_closure_variables=track_closure_variables 

1410 ), 

1411 ) 

1412 else: 

1413 self.deferred_where_criteria = False 

1414 self.where_criteria = coercions.expect( 

1415 roles.WhereHavingRole, where_criteria 

1416 ) 

1417 

1418 self.include_aliases = include_aliases 

1419 self.propagate_to_loaders = propagate_to_loaders 

1420 

1421 @classmethod 

1422 def _unreduce( 

1423 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1424 ): 

1425 return LoaderCriteriaOption( 

1426 entity, 

1427 where_criteria, 

1428 include_aliases=include_aliases, 

1429 propagate_to_loaders=propagate_to_loaders, 

1430 ) 

1431 

1432 def __reduce__(self): 

1433 return ( 

1434 LoaderCriteriaOption._unreduce, 

1435 ( 

1436 self.entity.class_ if self.entity else self.root_entity, 

1437 self._where_crit_orig, 

1438 self.include_aliases, 

1439 self.propagate_to_loaders, 

1440 ), 

1441 ) 

1442 

1443 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1444 if self.entity: 

1445 yield from self.entity.mapper.self_and_descendants 

1446 else: 

1447 assert self.root_entity 

1448 stack = list(self.root_entity.__subclasses__()) 

1449 while stack: 

1450 subclass = stack.pop(0) 

1451 ent = cast( 

1452 "_InternalEntityType[Any]", 

1453 inspection.inspect(subclass, raiseerr=False), 

1454 ) 

1455 if ent: 

1456 yield from ent.mapper.self_and_descendants 

1457 else: 

1458 stack.extend(subclass.__subclasses__()) 

1459 

1460 def _should_include(self, compile_state: _ORMCompileState) -> bool: 

1461 if ( 

1462 compile_state.select_statement._annotations.get( 

1463 "for_loader_criteria", None 

1464 ) 

1465 is self 

1466 ): 

1467 return False 

1468 return True 

1469 

1470 def _resolve_where_criteria( 

1471 self, ext_info: _InternalEntityType[Any] 

1472 ) -> ColumnElement[bool]: 

1473 if self.deferred_where_criteria: 

1474 crit = cast( 

1475 "ColumnElement[bool]", 

1476 self.where_criteria._resolve_with_args(ext_info.entity), 

1477 ) 

1478 else: 

1479 crit = self.where_criteria # type: ignore 

1480 assert isinstance(crit, ColumnElement) 

1481 return sql_util._deep_annotate( 

1482 crit, 

1483 {"for_loader_criteria": self}, 

1484 detect_subquery_cols=True, 

1485 ind_cols_on_fromclause=True, 

1486 ) 

1487 

1488 def process_compile_state_replaced_entities( 

1489 self, 

1490 compile_state: _ORMCompileState, 

1491 mapper_entities: Iterable[_MapperEntity], 

1492 ) -> None: 

1493 self.process_compile_state(compile_state) 

1494 

1495 def process_compile_state(self, compile_state: _ORMCompileState) -> None: 

1496 """Apply a modification to a given :class:`.CompileState`.""" 

1497 

1498 # if options to limit the criteria to immediate query only, 

1499 # use compile_state.attributes instead 

1500 

1501 self.get_global_criteria(compile_state.global_attributes) 

1502 

1503 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1504 for mp in self._all_mappers(): 

1505 load_criteria = attributes.setdefault( 

1506 ("additional_entity_criteria", mp), [] 

1507 ) 

1508 

1509 load_criteria.append(self) 

1510 

1511 

1512inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1513 

1514 

1515@inspection._inspects(type) 

1516def _inspect_mc( 

1517 class_: Type[_O], 

1518) -> Optional[Mapper[_O]]: 

1519 try: 

1520 class_manager = opt_manager_of_class(class_) 

1521 if class_manager is None or not class_manager.is_mapped: 

1522 return None 

1523 mapper = class_manager.mapper 

1524 except orm_exc.NO_STATE: 

1525 return None 

1526 else: 

1527 return mapper 

1528 

1529 

1530GenericAlias = type(List[Any]) 

1531 

1532 

1533@inspection._inspects(GenericAlias) 

1534def _inspect_generic_alias( 

1535 class_: Type[_O], 

1536) -> Optional[Mapper[_O]]: 

1537 origin = cast("Type[_O]", get_origin(class_)) 

1538 return _inspect_mc(origin) 

1539 

1540 

1541@inspection._self_inspects 

1542class Bundle( 

1543 ORMColumnsClauseRole[_T], 

1544 SupportsCloneAnnotations, 

1545 MemoizedHasCacheKey, 

1546 inspection.Inspectable["Bundle[_T]"], 

1547 InspectionAttr, 

1548): 

1549 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1550 under one namespace. 

1551 

1552 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1553 results returned by a column-oriented :class:`_query.Query` object. 

1554 It also 

1555 is extensible via simple subclassing, where the primary capability 

1556 to override is that of how the set of expressions should be returned, 

1557 allowing post-processing as well as custom return types, without 

1558 involving ORM identity-mapped classes. 

1559 

1560 .. seealso:: 

1561 

1562 :ref:`bundles` 

1563 

1564 :class:`.DictBundle` 

1565 

1566 """ 

1567 

1568 single_entity = False 

1569 """If True, queries for a single Bundle will be returned as a single 

1570 entity, rather than an element within a keyed tuple.""" 

1571 

1572 is_clause_element = False 

1573 

1574 is_mapper = False 

1575 

1576 is_aliased_class = False 

1577 

1578 is_bundle = True 

1579 

1580 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1581 

1582 proxy_set = util.EMPTY_SET 

1583 

1584 exprs: List[_ColumnsClauseElement] 

1585 

1586 def __init__( 

1587 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1588 ) -> None: 

1589 r"""Construct a new :class:`.Bundle`. 

1590 

1591 e.g.:: 

1592 

1593 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1594 

1595 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1596 print(row.mybundle.x, row.mybundle.y) 

1597 

1598 :param name: name of the bundle. 

1599 :param \*exprs: columns or SQL expressions comprising the bundle. 

1600 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1601 can be returned as a "single entity" outside of any enclosing tuple 

1602 in the same manner as a mapped entity. 

1603 

1604 """ # noqa: E501 

1605 self.name = self._label = name 

1606 coerced_exprs = [ 

1607 coercions.expect( 

1608 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1609 ) 

1610 for expr in exprs 

1611 ] 

1612 self.exprs = coerced_exprs 

1613 

1614 self.c = self.columns = WriteableColumnCollection( 

1615 (getattr(col, "key", col._label), col) 

1616 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1617 ).as_readonly() 

1618 self.single_entity = kw.pop("single_entity", self.single_entity) 

1619 

1620 def _gen_cache_key( 

1621 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1622 ) -> Tuple[Any, ...]: 

1623 return (self.__class__, self.name, self.single_entity) + tuple( 

1624 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1625 ) 

1626 

1627 @property 

1628 def mapper(self) -> Optional[Mapper[Any]]: 

1629 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1630 "parentmapper", None 

1631 ) 

1632 return mp 

1633 

1634 @property 

1635 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1636 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1637 0 

1638 ]._annotations.get("parententity", None) 

1639 return ie 

1640 

1641 @property 

1642 def entity_namespace( 

1643 self, 

1644 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1645 return self.c 

1646 

1647 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1648 

1649 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1650 

1651 e.g.:: 

1652 

1653 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1654 

1655 q = sess.query(bn).filter(bn.c.x == 5) 

1656 

1657 Nesting of bundles is also supported:: 

1658 

1659 b1 = Bundle( 

1660 "b1", 

1661 Bundle("b2", MyClass.a, MyClass.b), 

1662 Bundle("b3", MyClass.x, MyClass.y), 

1663 ) 

1664 

1665 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1666 

1667 .. seealso:: 

1668 

1669 :attr:`.Bundle.c` 

1670 

1671 """ # noqa: E501 

1672 

1673 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1674 """An alias for :attr:`.Bundle.columns`.""" 

1675 

1676 def _clone(self, **kw): 

1677 cloned = self.__class__.__new__(self.__class__) 

1678 cloned.__dict__.update(self.__dict__) 

1679 return cloned 

1680 

1681 def __clause_element__(self): 

1682 # ensure existing entity_namespace remains 

1683 annotations = {"bundle": self, "entity_namespace": self} 

1684 annotations.update(self._annotations) 

1685 

1686 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1687 "plugin_subject", self.entity 

1688 ) 

1689 return ( 

1690 expression.ClauseList( 

1691 _literal_as_text_role=roles.ColumnsClauseRole, 

1692 group=False, 

1693 *[e._annotations.get("bundle", e) for e in self.exprs], 

1694 ) 

1695 ._annotate(annotations) 

1696 ._set_propagate_attrs( 

1697 # the Bundle *must* use the orm plugin no matter what. the 

1698 # subject can be None but it's much better if it's not. 

1699 { 

1700 "compile_state_plugin": "orm", 

1701 "plugin_subject": plugin_subject, 

1702 } 

1703 ) 

1704 ) 

1705 

1706 @property 

1707 def clauses(self): 

1708 return self.__clause_element__().clauses 

1709 

1710 def label(self, name): 

1711 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1712 

1713 cloned = self._clone() 

1714 cloned.name = name 

1715 return cloned 

1716 

1717 def create_row_processor( 

1718 self, 

1719 query: Select[Unpack[TupleAny]], 

1720 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1721 labels: Sequence[str], 

1722 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1723 """Produce the "row processing" function for this :class:`.Bundle`. 

1724 

1725 May be overridden by subclasses to provide custom behaviors when 

1726 results are fetched. The method is passed the statement object and a 

1727 set of "row processor" functions at query execution time; these 

1728 processor functions when given a result row will return the individual 

1729 attribute value, which can then be adapted into any kind of return data 

1730 structure. 

1731 

1732 The example below illustrates replacing the usual :class:`.Row` 

1733 return structure with a straight Python dictionary:: 

1734 

1735 from sqlalchemy.orm import Bundle 

1736 

1737 

1738 class DictBundle(Bundle): 

1739 def create_row_processor(self, query, procs, labels): 

1740 "Override create_row_processor to return values as dictionaries" 

1741 

1742 def proc(row): 

1743 return dict(zip(labels, (proc(row) for proc in procs))) 

1744 

1745 return proc 

1746 

1747 A result from the above :class:`_orm.Bundle` will return dictionary 

1748 values:: 

1749 

1750 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1751 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1752 print(row.mybundle["data1"], row.mybundle["data2"]) 

1753 

1754 The above example is available natively using :class:`.DictBundle` 

1755 

1756 .. seealso:: 

1757 

1758 :class:`.DictBundle` 

1759 

1760 """ # noqa: E501 

1761 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1762 

1763 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1764 return keyed_tuple([proc(row) for proc in procs]) 

1765 

1766 return proc 

1767 

1768 

1769class DictBundle(Bundle[_T]): 

1770 """Like :class:`.Bundle` but returns ``dict`` instances instead of 

1771 named tuple like objects:: 

1772 

1773 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1774 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1775 print(row.mybundle["data1"], row.mybundle["data2"]) 

1776 

1777 Differently from :class:`.Bundle`, multiple columns with the same name are 

1778 not supported. 

1779 

1780 .. versionadded:: 2.1 

1781 

1782 .. seealso:: 

1783 

1784 :ref:`bundles` 

1785 

1786 :class:`.Bundle` 

1787 """ 

1788 

1789 def __init__( 

1790 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1791 ) -> None: 

1792 super().__init__(name, *exprs, **kw) 

1793 if len(set(self.c.keys())) != len(self.c): 

1794 raise sa_exc.ArgumentError( 

1795 "DictBundle does not support duplicate column names" 

1796 ) 

1797 

1798 def create_row_processor( 

1799 self, 

1800 query: Select[Unpack[TupleAny]], 

1801 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1802 labels: Sequence[str], 

1803 ) -> Callable[[Row[Unpack[TupleAny]]], dict[str, Any]]: 

1804 def proc(row: Row[Unpack[TupleAny]]) -> dict[str, Any]: 

1805 return dict(zip(labels, (proc(row) for proc in procs))) 

1806 

1807 return proc 

1808 

1809 

1810def _orm_full_deannotate(element: _SA) -> _SA: 

1811 return sql_util._deep_deannotate(element) 

1812 

1813 

1814class _ORMJoin(expression.Join): 

1815 """Extend Join to support ORM constructs as input.""" 

1816 

1817 __visit_name__ = expression.Join.__visit_name__ 

1818 

1819 inherit_cache = True 

1820 

1821 def __init__( 

1822 self, 

1823 left: _FromClauseArgument, 

1824 right: _FromClauseArgument, 

1825 onclause: Optional[_OnClauseArgument] = None, 

1826 isouter: bool = False, 

1827 full: bool = False, 

1828 _left_memo: Optional[Any] = None, 

1829 _right_memo: Optional[Any] = None, 

1830 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1831 ): 

1832 left_info = cast( 

1833 "Union[FromClause, _InternalEntityType[Any]]", 

1834 inspection.inspect(left), 

1835 ) 

1836 

1837 right_info = cast( 

1838 "Union[FromClause, _InternalEntityType[Any]]", 

1839 inspection.inspect(right), 

1840 ) 

1841 adapt_to = right_info.selectable 

1842 

1843 # used by joined eager loader 

1844 self._left_memo = _left_memo 

1845 self._right_memo = _right_memo 

1846 

1847 if isinstance(onclause, attributes.QueryableAttribute): 

1848 if TYPE_CHECKING: 

1849 assert isinstance( 

1850 onclause.comparator, RelationshipProperty.Comparator 

1851 ) 

1852 on_selectable = onclause.comparator._source_selectable() 

1853 prop = onclause.property 

1854 _extra_criteria += onclause._extra_criteria 

1855 elif isinstance(onclause, MapperProperty): 

1856 # used internally by joined eager loader...possibly not ideal 

1857 prop = onclause 

1858 on_selectable = prop.parent.selectable 

1859 else: 

1860 prop = None 

1861 on_selectable = None 

1862 

1863 left_selectable = left_info.selectable 

1864 if prop: 

1865 adapt_from: Optional[FromClause] 

1866 if sql_util.clause_is_present(on_selectable, left_selectable): 

1867 adapt_from = on_selectable 

1868 else: 

1869 assert isinstance(left_selectable, FromClause) 

1870 adapt_from = left_selectable 

1871 

1872 ( 

1873 pj, 

1874 sj, 

1875 source, 

1876 dest, 

1877 secondary, 

1878 target_adapter, 

1879 ) = prop._create_joins( 

1880 source_selectable=adapt_from, 

1881 dest_selectable=adapt_to, 

1882 source_polymorphic=True, 

1883 of_type_entity=right_info, 

1884 alias_secondary=True, 

1885 extra_criteria=_extra_criteria, 

1886 ) 

1887 

1888 if sj is not None: 

1889 if isouter: 

1890 # note this is an inner join from secondary->right 

1891 right = sql.join(secondary, right, sj) 

1892 onclause = pj 

1893 else: 

1894 left = sql.join(left, secondary, pj, isouter) 

1895 onclause = sj 

1896 else: 

1897 onclause = pj 

1898 

1899 self._target_adapter = target_adapter 

1900 

1901 # we don't use the normal coercions logic for _ORMJoin 

1902 # (probably should), so do some gymnastics to get the entity. 

1903 # logic here is for #8721, which was a major bug in 1.4 

1904 # for almost two years, not reported/fixed until 1.4.43 (!) 

1905 if is_selectable(left_info): 

1906 parententity = left_selectable._annotations.get( 

1907 "parententity", None 

1908 ) 

1909 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1910 parententity = left_info 

1911 else: 

1912 parententity = None 

1913 

1914 if parententity is not None: 

1915 self._annotations = self._annotations.union( 

1916 {"parententity": parententity} 

1917 ) 

1918 

1919 augment_onclause = bool(_extra_criteria) and not prop 

1920 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1921 

1922 assert self.onclause is not None 

1923 

1924 if augment_onclause: 

1925 self.onclause &= sql.and_(*_extra_criteria) 

1926 

1927 if ( 

1928 not prop 

1929 and getattr(right_info, "mapper", None) 

1930 and right_info.mapper.single # type: ignore 

1931 ): 

1932 right_info = cast("_InternalEntityType[Any]", right_info) 

1933 # if single inheritance target and we are using a manual 

1934 # or implicit ON clause, augment it the same way we'd augment the 

1935 # WHERE. 

1936 single_crit = right_info.mapper._single_table_criterion 

1937 if single_crit is not None: 

1938 if insp_is_aliased_class(right_info): 

1939 single_crit = right_info._adapter.traverse(single_crit) 

1940 self.onclause = self.onclause & single_crit 

1941 

1942 def _splice_into_center(self, other): 

1943 """Splice a join into the center. 

1944 

1945 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1946 

1947 """ 

1948 leftmost = other 

1949 while isinstance(leftmost, sql.Join): 

1950 leftmost = leftmost.left 

1951 

1952 assert self.right is leftmost 

1953 

1954 left = _ORMJoin( 

1955 self.left, 

1956 other.left, 

1957 self.onclause, 

1958 isouter=self.isouter, 

1959 _left_memo=self._left_memo, 

1960 _right_memo=other._left_memo._path_registry, 

1961 ) 

1962 

1963 return _ORMJoin( 

1964 left, 

1965 other.right, 

1966 other.onclause, 

1967 isouter=other.isouter, 

1968 _right_memo=other._right_memo, 

1969 ) 

1970 

1971 def join( 

1972 self, 

1973 right: _FromClauseArgument, 

1974 onclause: Optional[_OnClauseArgument] = None, 

1975 isouter: bool = False, 

1976 full: bool = False, 

1977 ) -> _ORMJoin: 

1978 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1979 

1980 def outerjoin( 

1981 self, 

1982 right: _FromClauseArgument, 

1983 onclause: Optional[_OnClauseArgument] = None, 

1984 full: bool = False, 

1985 ) -> _ORMJoin: 

1986 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1987 

1988 

1989def with_parent( 

1990 instance: object, 

1991 prop: attributes.QueryableAttribute[Any], 

1992 from_entity: Optional[_EntityType[Any]] = None, 

1993) -> ColumnElement[bool]: 

1994 """Create filtering criterion that relates this query's primary entity 

1995 to the given related instance, using established 

1996 :func:`_orm.relationship()` 

1997 configuration. 

1998 

1999 E.g.:: 

2000 

2001 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

2002 

2003 The SQL rendered is the same as that rendered when a lazy loader 

2004 would fire off from the given parent on that attribute, meaning 

2005 that the appropriate state is taken from the parent object in 

2006 Python without the need to render joins to the parent table 

2007 in the rendered statement. 

2008 

2009 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

2010 to indicate the left side of the criteria:: 

2011 

2012 

2013 a1 = aliased(Address) 

2014 a2 = aliased(Address) 

2015 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

2016 

2017 The above use is equivalent to using the 

2018 :func:`_orm.with_parent.from_entity` argument:: 

2019 

2020 a1 = aliased(Address) 

2021 a2 = aliased(Address) 

2022 stmt = select(a1, a2).where( 

2023 with_parent(u1, User.addresses, from_entity=a2) 

2024 ) 

2025 

2026 :param instance: 

2027 An instance which has some :func:`_orm.relationship`. 

2028 

2029 :param property: 

2030 Class-bound attribute, which indicates 

2031 what relationship from the instance should be used to reconcile the 

2032 parent/child relationship. 

2033 

2034 :param from_entity: 

2035 Entity in which to consider as the left side. This defaults to the 

2036 "zero" entity of the :class:`_query.Query` itself. 

2037 

2038 """ # noqa: E501 

2039 prop_t: RelationshipProperty[Any] 

2040 

2041 if isinstance(prop, str): 

2042 raise sa_exc.ArgumentError( 

2043 "with_parent() accepts class-bound mapped attributes, not strings" 

2044 ) 

2045 elif isinstance(prop, attributes.QueryableAttribute): 

2046 if prop._of_type: 

2047 from_entity = prop._of_type 

2048 mapper_property = prop.property 

2049 if mapper_property is None or not prop_is_relationship( 

2050 mapper_property 

2051 ): 

2052 raise sa_exc.ArgumentError( 

2053 f"Expected relationship property for with_parent(), " 

2054 f"got {mapper_property}" 

2055 ) 

2056 prop_t = mapper_property 

2057 else: 

2058 prop_t = prop 

2059 

2060 return prop_t._with_parent(instance, from_entity=from_entity) 

2061 

2062 

2063def has_identity(object_: object) -> bool: 

2064 """Return True if the given object has a database 

2065 identity. 

2066 

2067 This typically corresponds to the object being 

2068 in either the persistent or detached state. 

2069 

2070 .. seealso:: 

2071 

2072 :func:`.was_deleted` 

2073 

2074 """ 

2075 state = attributes.instance_state(object_) 

2076 return state.has_identity 

2077 

2078 

2079def was_deleted(object_: object) -> bool: 

2080 """Return True if the given object was deleted 

2081 within a session flush. 

2082 

2083 This is regardless of whether or not the object is 

2084 persistent or detached. 

2085 

2086 .. seealso:: 

2087 

2088 :attr:`.InstanceState.was_deleted` 

2089 

2090 """ 

2091 

2092 state = attributes.instance_state(object_) 

2093 return state.was_deleted 

2094 

2095 

2096def _entity_corresponds_to( 

2097 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2098) -> bool: 

2099 """determine if 'given' corresponds to 'entity', in terms 

2100 of an entity passed to Query that would match the same entity 

2101 being referred to elsewhere in the query. 

2102 

2103 """ 

2104 if insp_is_aliased_class(entity): 

2105 if insp_is_aliased_class(given): 

2106 if entity._base_alias() is given._base_alias(): 

2107 return True 

2108 return False 

2109 elif insp_is_aliased_class(given): 

2110 if given._use_mapper_path: 

2111 return entity in given.with_polymorphic_mappers 

2112 else: 

2113 return entity is given 

2114 

2115 assert insp_is_mapper(given) 

2116 return entity.common_parent(given) 

2117 

2118 

2119def _entity_corresponds_to_use_path_impl( 

2120 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2121) -> bool: 

2122 """determine if 'given' corresponds to 'entity', in terms 

2123 of a path of loader options where a mapped attribute is taken to 

2124 be a member of a parent entity. 

2125 

2126 e.g.:: 

2127 

2128 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2129 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2130 

2131 a1 = aliased(A) 

2132 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2133 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2134 

2135 wp = with_polymorphic(A, [A1, A2]) 

2136 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2137 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2138 

2139 """ 

2140 if insp_is_aliased_class(given): 

2141 return ( 

2142 insp_is_aliased_class(entity) 

2143 and not entity._use_mapper_path 

2144 and (given is entity or entity in given._with_polymorphic_entities) 

2145 ) 

2146 elif not insp_is_aliased_class(entity): 

2147 return given.isa(entity.mapper) 

2148 else: 

2149 return ( 

2150 entity._use_mapper_path 

2151 and given in entity.with_polymorphic_mappers 

2152 ) 

2153 

2154 

2155def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2156 """determine if 'given' "is a" mapper, in terms of the given 

2157 would load rows of type 'mapper'. 

2158 

2159 """ 

2160 if given.is_aliased_class: 

2161 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2162 mapper 

2163 ) 

2164 elif given.with_polymorphic_mappers: 

2165 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2166 else: 

2167 return given.isa(mapper) 

2168 

2169 

2170def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2171 """calculate __getitem__ in terms of an iterable query object 

2172 that also has a slice() method. 

2173 

2174 """ 

2175 

2176 def _no_negative_indexes(): 

2177 raise IndexError( 

2178 "negative indexes are not accepted by SQL " 

2179 "index / slice operators" 

2180 ) 

2181 

2182 if isinstance(item, slice): 

2183 start, stop, step = util.decode_slice(item) 

2184 

2185 if ( 

2186 isinstance(stop, int) 

2187 and isinstance(start, int) 

2188 and stop - start <= 0 

2189 ): 

2190 return [] 

2191 

2192 elif (isinstance(start, int) and start < 0) or ( 

2193 isinstance(stop, int) and stop < 0 

2194 ): 

2195 _no_negative_indexes() 

2196 

2197 res = iterable_query.slice(start, stop) 

2198 if step is not None: 

2199 return list(res)[None : None : item.step] 

2200 else: 

2201 return list(res) 

2202 else: 

2203 if item == -1: 

2204 _no_negative_indexes() 

2205 else: 

2206 return list(iterable_query[item : item + 1])[0] 

2207 

2208 

2209def _is_mapped_annotation( 

2210 raw_annotation: _AnnotationScanType, 

2211 cls: Type[Any], 

2212 originating_cls: Type[Any], 

2213) -> bool: 

2214 try: 

2215 annotated = de_stringify_annotation( 

2216 cls, raw_annotation, originating_cls.__module__ 

2217 ) 

2218 except NameError: 

2219 # in most cases, at least within our own tests, we can raise 

2220 # here, which is more accurate as it prevents us from returning 

2221 # false negatives. However, in the real world, try to avoid getting 

2222 # involved with end-user annotations that have nothing to do with us. 

2223 # see issue #8888 where we bypass using this function in the case 

2224 # that we want to detect an unresolvable Mapped[] type. 

2225 return False 

2226 else: 

2227 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2228 

2229 

2230class _CleanupError(Exception): 

2231 pass 

2232 

2233 

2234def _cleanup_mapped_str_annotation( 

2235 annotation: str, originating_module: str 

2236) -> str: 

2237 # fix up an annotation that comes in as the form: 

2238 # 'Mapped[List[Address]]' so that it instead looks like: 

2239 # 'Mapped[List["Address"]]' , which will allow us to get 

2240 # "Address" as a string 

2241 

2242 # additionally, resolve symbols for these names since this is where 

2243 # we'd have to do it 

2244 

2245 inner: Optional[Match[str]] 

2246 

2247 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2248 

2249 if not mm: 

2250 return annotation 

2251 

2252 # ticket #8759. Resolve the Mapped name to a real symbol. 

2253 # originally this just checked the name. 

2254 try: 

2255 obj = eval_name_only(mm.group(1), originating_module) 

2256 except NameError as ne: 

2257 raise _CleanupError( 

2258 f'For annotation "{annotation}", could not resolve ' 

2259 f'container type "{mm.group(1)}". ' 

2260 "Please ensure this type is imported at the module level " 

2261 "outside of TYPE_CHECKING blocks" 

2262 ) from ne 

2263 

2264 if obj is typing.ClassVar: 

2265 real_symbol = "ClassVar" 

2266 else: 

2267 try: 

2268 if issubclass(obj, _MappedAnnotationBase): 

2269 real_symbol = obj.__name__ 

2270 else: 

2271 return annotation 

2272 except TypeError: 

2273 # avoid isinstance(obj, type) check, just catch TypeError 

2274 return annotation 

2275 

2276 # note: if one of the codepaths above didn't define real_symbol and 

2277 # then didn't return, real_symbol raises UnboundLocalError 

2278 # which is actually a NameError, and the calling routines don't 

2279 # notice this since they are catching NameError anyway. Just in case 

2280 # this is being modified in the future, something to be aware of. 

2281 

2282 stack = [] 

2283 inner = mm 

2284 while True: 

2285 stack.append(real_symbol if mm is inner else inner.group(1)) 

2286 g2 = inner.group(2) 

2287 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2288 if inner is None: 

2289 stack.append(g2) 

2290 break 

2291 

2292 # stacks we want to rewrite, that is, quote the last entry which 

2293 # we think is a relationship class name: 

2294 # 

2295 # ['Mapped', 'List', 'Address'] 

2296 # ['Mapped', 'A'] 

2297 # 

2298 # stacks we dont want to rewrite, which are generally MappedColumn 

2299 # use cases: 

2300 # 

2301 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2302 # ['Mapped', 'dict[str, str] | None'] 

2303 

2304 if ( 

2305 # avoid already quoted symbols such as 

2306 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2307 not re.match(r"""^["'].*["']$""", stack[-1]) 

2308 # avoid further generics like Dict[] such as 

2309 # ['Mapped', 'dict[str, str] | None'], 

2310 # ['Mapped', 'list[int] | list[str]'], 

2311 # ['Mapped', 'Union[list[int], list[str]]'], 

2312 and not re.search(r"[\[\]]", stack[-1]) 

2313 ): 

2314 stripchars = "\"' " 

2315 stack[-1] = ", ".join( 

2316 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2317 ) 

2318 

2319 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2320 

2321 return annotation 

2322 

2323 

2324def _extract_mapped_subtype( 

2325 raw_annotation: Optional[_AnnotationScanType], 

2326 cls: type, 

2327 originating_module: str, 

2328 key: str, 

2329 attr_cls: Type[Any], 

2330 required: bool, 

2331 is_dataclass_field: bool, 

2332 expect_mapped: bool = True, 

2333 raiseerr: bool = True, 

2334) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2335 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2336 so, return the ``something`` part. 

2337 

2338 Includes error raise scenarios and other options. 

2339 

2340 """ 

2341 

2342 if raw_annotation is None: 

2343 if required: 

2344 raise orm_exc.MappedAnnotationError( 

2345 f"Python typing annotation is required for attribute " 

2346 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2347 f'"{attr_cls.__name__}" construct are None or not present' 

2348 ) 

2349 return None 

2350 

2351 try: 

2352 # destringify the "outside" of the annotation. note we are not 

2353 # adding include_generic so it will *not* dig into generic contents, 

2354 # which will remain as ForwardRef or plain str under future annotations 

2355 # mode. The full destringify happens later when mapped_column goes 

2356 # to do a full lookup in the registry type_annotations_map. 

2357 annotated = de_stringify_annotation( 

2358 cls, 

2359 raw_annotation, 

2360 originating_module, 

2361 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2362 ) 

2363 except _CleanupError as ce: 

2364 raise orm_exc.MappedAnnotationError( 

2365 f"Could not interpret annotation {raw_annotation}. " 

2366 "Check that it uses names that are correctly imported at the " 

2367 "module level. See chained stack trace for more hints." 

2368 ) from ce 

2369 except NameError as ne: 

2370 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2371 raise orm_exc.MappedAnnotationError( 

2372 f"Could not interpret annotation {raw_annotation}. " 

2373 "Check that it uses names that are correctly imported at the " 

2374 "module level. See chained stack trace for more hints." 

2375 ) from ne 

2376 

2377 annotated = raw_annotation # type: ignore 

2378 

2379 if is_dataclass_field: 

2380 return annotated, None 

2381 else: 

2382 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2383 annotated, _MappedAnnotationBase 

2384 ): 

2385 if expect_mapped: 

2386 if not raiseerr: 

2387 return None 

2388 

2389 origin = getattr(annotated, "__origin__", None) 

2390 if origin is typing.ClassVar: 

2391 return None 

2392 

2393 # check for other kind of ORM descriptor like AssociationProxy, 

2394 # don't raise for that (issue #9957) 

2395 elif isinstance(origin, type) and issubclass( 

2396 origin, ORMDescriptor 

2397 ): 

2398 return None 

2399 

2400 raise orm_exc.MappedAnnotationError( 

2401 f'Type annotation for "{cls.__name__}.{key}" ' 

2402 "can't be correctly interpreted for " 

2403 "Annotated Declarative Table form. ORM annotations " 

2404 "should normally make use of the ``Mapped[]`` generic " 

2405 "type, or other ORM-compatible generic type, as a " 

2406 "container for the actual type, which indicates the " 

2407 "intent that the attribute is mapped. " 

2408 "Class variables that are not intended to be mapped " 

2409 "by the ORM should use ClassVar[]. " 

2410 "To allow Annotated Declarative to disregard legacy " 

2411 "annotations which don't use Mapped[] to pass, set " 

2412 '"__allow_unmapped__ = True" on the class or a ' 

2413 "superclass this class.", 

2414 code="zlpr", 

2415 ) 

2416 

2417 else: 

2418 return annotated, None 

2419 

2420 generic_annotated = cast(GenericProtocol[Any], annotated) 

2421 if len(generic_annotated.__args__) != 1: 

2422 raise orm_exc.MappedAnnotationError( 

2423 "Expected sub-type for Mapped[] annotation" 

2424 ) 

2425 

2426 return ( 

2427 # fix dict/list/set args to be ForwardRef, see #11814 

2428 fixup_container_fwd_refs(generic_annotated.__args__[0]), 

2429 generic_annotated.__origin__, 

2430 ) 

2431 

2432 

2433def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2434 if hasattr(prop, "_mapper_property_name"): 

2435 name = prop._mapper_property_name() 

2436 else: 

2437 name = None 

2438 return util.clsname_as_plain_name(prop, name)