Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

819 statements  

1# orm/util.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Match 

27from typing import Optional 

28from typing import Protocol 

29from typing import Sequence 

30from typing import Tuple 

31from typing import Type 

32from typing import TYPE_CHECKING 

33from typing import TypeVar 

34from typing import Union 

35import weakref 

36 

37from . import attributes # noqa 

38from . import exc 

39from . import exc as orm_exc 

40from ._typing import _O 

41from ._typing import insp_is_aliased_class 

42from ._typing import insp_is_mapper 

43from ._typing import prop_is_relationship 

44from .base import _class_to_mapper as _class_to_mapper 

45from .base import _MappedAnnotationBase 

46from .base import _never_set as _never_set # noqa: F401 

47from .base import _none_only_set as _none_only_set # noqa: F401 

48from .base import _none_set as _none_set # noqa: F401 

49from .base import attribute_str as attribute_str # noqa: F401 

50from .base import class_mapper as class_mapper 

51from .base import DynamicMapped 

52from .base import InspectionAttr as InspectionAttr 

53from .base import instance_str as instance_str # noqa: F401 

54from .base import Mapped 

55from .base import object_mapper as object_mapper 

56from .base import object_state as object_state # noqa: F401 

57from .base import opt_manager_of_class 

58from .base import ORMDescriptor 

59from .base import state_attribute_str as state_attribute_str # noqa: F401 

60from .base import state_class_str as state_class_str # noqa: F401 

61from .base import state_str as state_str # noqa: F401 

62from .base import WriteOnlyMapped 

63from .interfaces import CriteriaOption 

64from .interfaces import MapperProperty as MapperProperty 

65from .interfaces import ORMColumnsClauseRole 

66from .interfaces import ORMEntityColumnsClauseRole 

67from .interfaces import ORMFromClauseRole 

68from .path_registry import PathRegistry as PathRegistry 

69from .. import event 

70from .. import exc as sa_exc 

71from .. import inspection 

72from .. import sql 

73from .. import util 

74from ..engine.result import result_tuple 

75from ..sql import coercions 

76from ..sql import expression 

77from ..sql import lambdas 

78from ..sql import roles 

79from ..sql import util as sql_util 

80from ..sql import visitors 

81from ..sql._typing import is_selectable 

82from ..sql.annotation import SupportsCloneAnnotations 

83from ..sql.base import ColumnCollection 

84from ..sql.cache_key import HasCacheKey 

85from ..sql.cache_key import MemoizedHasCacheKey 

86from ..sql.elements import ColumnElement 

87from ..sql.elements import KeyedColumnElement 

88from ..sql.selectable import FromClause 

89from ..util.langhelpers import MemoizedSlots 

90from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

91from ..util.typing import eval_name_only as _eval_name_only 

92from ..util.typing import fixup_container_fwd_refs 

93from ..util.typing import get_origin 

94from ..util.typing import is_origin_of_cls 

95from ..util.typing import Literal 

96from ..util.typing import TupleAny 

97from ..util.typing import Unpack 

98 

99if typing.TYPE_CHECKING: 

100 from ._typing import _EntityType 

101 from ._typing import _IdentityKeyType 

102 from ._typing import _InternalEntityType 

103 from ._typing import _ORMCOLEXPR 

104 from .context import _MapperEntity 

105 from .context import _ORMCompileState 

106 from .mapper import Mapper 

107 from .path_registry import _AbstractEntityRegistry 

108 from .query import Query 

109 from .relationships import RelationshipProperty 

110 from ..engine import Row 

111 from ..engine import RowMapping 

112 from ..sql._typing import _CE 

113 from ..sql._typing import _ColumnExpressionArgument 

114 from ..sql._typing import _EquivalentColumnMap 

115 from ..sql._typing import _FromClauseArgument 

116 from ..sql._typing import _OnClauseArgument 

117 from ..sql._typing import _PropagateAttrsType 

118 from ..sql.annotation import _SA 

119 from ..sql.base import ReadOnlyColumnCollection 

120 from ..sql.elements import BindParameter 

121 from ..sql.selectable import _ColumnsClauseElement 

122 from ..sql.selectable import Select 

123 from ..sql.selectable import Selectable 

124 from ..sql.visitors import anon_map 

125 from ..util.typing import _AnnotationScanType 

126 

127_T = TypeVar("_T", bound=Any) 

128 

129all_cascades = frozenset( 

130 ( 

131 "delete", 

132 "delete-orphan", 

133 "all", 

134 "merge", 

135 "expunge", 

136 "save-update", 

137 "refresh-expire", 

138 "none", 

139 ) 

140) 

141 

142_de_stringify_partial = functools.partial( 

143 functools.partial, 

144 locals_=util.immutabledict( 

145 { 

146 "Mapped": Mapped, 

147 "WriteOnlyMapped": WriteOnlyMapped, 

148 "DynamicMapped": DynamicMapped, 

149 } 

150 ), 

151) 

152 

153# partial is practically useless as we have to write out the whole 

154# function and maintain the signature anyway 

155 

156 

157class _DeStringifyAnnotation(Protocol): 

158 def __call__( 

159 self, 

160 cls: Type[Any], 

161 annotation: _AnnotationScanType, 

162 originating_module: str, 

163 *, 

164 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

165 include_generic: bool = False, 

166 ) -> Type[Any]: ... 

167 

168 

169de_stringify_annotation = cast( 

170 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

171) 

172 

173 

174class _EvalNameOnly(Protocol): 

175 def __call__(self, name: str, module_name: str) -> Any: ... 

176 

177 

178eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

179 

180 

181class CascadeOptions(FrozenSet[str]): 

182 """Keeps track of the options sent to 

183 :paramref:`.relationship.cascade`""" 

184 

185 _add_w_all_cascades = all_cascades.difference( 

186 ["all", "none", "delete-orphan"] 

187 ) 

188 _allowed_cascades = all_cascades 

189 

190 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

191 

192 __slots__ = ( 

193 "save_update", 

194 "delete", 

195 "refresh_expire", 

196 "merge", 

197 "expunge", 

198 "delete_orphan", 

199 ) 

200 

201 save_update: bool 

202 delete: bool 

203 refresh_expire: bool 

204 merge: bool 

205 expunge: bool 

206 delete_orphan: bool 

207 

208 def __new__( 

209 cls, value_list: Optional[Union[Iterable[str], str]] 

210 ) -> CascadeOptions: 

211 if isinstance(value_list, str) or value_list is None: 

212 return cls.from_string(value_list) # type: ignore 

213 values = set(value_list) 

214 if values.difference(cls._allowed_cascades): 

215 raise sa_exc.ArgumentError( 

216 "Invalid cascade option(s): %s" 

217 % ", ".join( 

218 [ 

219 repr(x) 

220 for x in sorted( 

221 values.difference(cls._allowed_cascades) 

222 ) 

223 ] 

224 ) 

225 ) 

226 

227 if "all" in values: 

228 values.update(cls._add_w_all_cascades) 

229 if "none" in values: 

230 values.clear() 

231 values.discard("all") 

232 

233 self = super().__new__(cls, values) 

234 self.save_update = "save-update" in values 

235 self.delete = "delete" in values 

236 self.refresh_expire = "refresh-expire" in values 

237 self.merge = "merge" in values 

238 self.expunge = "expunge" in values 

239 self.delete_orphan = "delete-orphan" in values 

240 

241 if self.delete_orphan and not self.delete: 

242 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

243 return self 

244 

245 def __repr__(self): 

246 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

247 

248 @classmethod 

249 def from_string(cls, arg): 

250 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

251 return cls(values) 

252 

253 

254def _validator_events(desc, key, validator, include_removes, include_backrefs): 

255 """Runs a validation method on an attribute value to be set or 

256 appended. 

257 """ 

258 

259 if not include_backrefs: 

260 

261 def detect_is_backref(state, initiator): 

262 impl = state.manager[key].impl 

263 return initiator.impl is not impl 

264 

265 if include_removes: 

266 

267 def append(state, value, initiator): 

268 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

269 include_backrefs or not detect_is_backref(state, initiator) 

270 ): 

271 return validator(state.obj(), key, value, False) 

272 else: 

273 return value 

274 

275 def bulk_set(state, values, initiator): 

276 if include_backrefs or not detect_is_backref(state, initiator): 

277 obj = state.obj() 

278 values[:] = [ 

279 validator(obj, key, value, False) for value in values 

280 ] 

281 

282 def set_(state, value, oldvalue, initiator): 

283 if include_backrefs or not detect_is_backref(state, initiator): 

284 return validator(state.obj(), key, value, False) 

285 else: 

286 return value 

287 

288 def remove(state, value, initiator): 

289 if include_backrefs or not detect_is_backref(state, initiator): 

290 validator(state.obj(), key, value, True) 

291 

292 else: 

293 

294 def append(state, value, initiator): 

295 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

296 include_backrefs or not detect_is_backref(state, initiator) 

297 ): 

298 return validator(state.obj(), key, value) 

299 else: 

300 return value 

301 

302 def bulk_set(state, values, initiator): 

303 if include_backrefs or not detect_is_backref(state, initiator): 

304 obj = state.obj() 

305 values[:] = [validator(obj, key, value) for value in values] 

306 

307 def set_(state, value, oldvalue, initiator): 

308 if include_backrefs or not detect_is_backref(state, initiator): 

309 return validator(state.obj(), key, value) 

310 else: 

311 return value 

312 

313 event.listen(desc, "append", append, raw=True, retval=True) 

314 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

315 event.listen(desc, "set", set_, raw=True, retval=True) 

316 if include_removes: 

317 event.listen(desc, "remove", remove, raw=True, retval=True) 

318 

319 

320def polymorphic_union( 

321 table_map, typecolname, aliasname="p_union", cast_nulls=True 

322): 

323 """Create a ``UNION`` statement used by a polymorphic mapper. 

324 

325 See :ref:`concrete_inheritance` for an example of how 

326 this is used. 

327 

328 :param table_map: mapping of polymorphic identities to 

329 :class:`_schema.Table` objects. 

330 :param typecolname: string name of a "discriminator" column, which will be 

331 derived from the query, producing the polymorphic identity for 

332 each row. If ``None``, no polymorphic discriminator is generated. 

333 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

334 construct generated. 

335 :param cast_nulls: if True, non-existent columns, which are represented 

336 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

337 that is problematic on some backends such as Oracle - in which case it 

338 can be set to False. 

339 

340 """ 

341 

342 colnames: util.OrderedSet[str] = util.OrderedSet() 

343 colnamemaps = {} 

344 types = {} 

345 for key in table_map: 

346 table = table_map[key] 

347 

348 table = coercions.expect(roles.FromClauseRole, table) 

349 table_map[key] = table 

350 

351 m = {} 

352 for c in table.c: 

353 if c.key == typecolname: 

354 raise sa_exc.InvalidRequestError( 

355 "Polymorphic union can't use '%s' as the discriminator " 

356 "column due to mapped column %r; please apply the " 

357 "'typecolname' " 

358 "argument; this is available on " 

359 "ConcreteBase as '_concrete_discriminator_name'" 

360 % (typecolname, c) 

361 ) 

362 colnames.add(c.key) 

363 m[c.key] = c 

364 types[c.key] = c.type 

365 colnamemaps[table] = m 

366 

367 def col(name, table): 

368 try: 

369 return colnamemaps[table][name] 

370 except KeyError: 

371 if cast_nulls: 

372 return sql.cast(sql.null(), types[name]).label(name) 

373 else: 

374 return sql.type_coerce(sql.null(), types[name]).label(name) 

375 

376 result = [] 

377 for type_, table in table_map.items(): 

378 if typecolname is not None: 

379 result.append( 

380 sql.select( 

381 *( 

382 [col(name, table) for name in colnames] 

383 + [ 

384 sql.literal_column( 

385 sql_util._quote_ddl_expr(type_) 

386 ).label(typecolname) 

387 ] 

388 ) 

389 ).select_from(table) 

390 ) 

391 else: 

392 result.append( 

393 sql.select( 

394 *[col(name, table) for name in colnames] 

395 ).select_from(table) 

396 ) 

397 return sql.union_all(*result).alias(aliasname) 

398 

399 

400def identity_key( 

401 class_: Optional[Type[_T]] = None, 

402 ident: Union[Any, Tuple[Any, ...]] = None, 

403 *, 

404 instance: Optional[_T] = None, 

405 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

406 identity_token: Optional[Any] = None, 

407) -> _IdentityKeyType[_T]: 

408 r"""Generate "identity key" tuples, as are used as keys in the 

409 :attr:`.Session.identity_map` dictionary. 

410 

411 This function has several call styles: 

412 

413 * ``identity_key(class, ident, identity_token=token)`` 

414 

415 This form receives a mapped class and a primary key scalar or 

416 tuple as an argument. 

417 

418 E.g.:: 

419 

420 >>> identity_key(MyClass, (1, 2)) 

421 (<class '__main__.MyClass'>, (1, 2), None) 

422 

423 :param class: mapped class (must be a positional argument) 

424 :param ident: primary key, may be a scalar or tuple argument. 

425 :param identity_token: optional identity token 

426 

427 * ``identity_key(instance=instance)`` 

428 

429 This form will produce the identity key for a given instance. The 

430 instance need not be persistent, only that its primary key attributes 

431 are populated (else the key will contain ``None`` for those missing 

432 values). 

433 

434 E.g.:: 

435 

436 >>> instance = MyClass(1, 2) 

437 >>> identity_key(instance=instance) 

438 (<class '__main__.MyClass'>, (1, 2), None) 

439 

440 In this form, the given instance is ultimately run though 

441 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

442 effect of performing a database check for the corresponding row 

443 if the object is expired. 

444 

445 :param instance: object instance (must be given as a keyword arg) 

446 

447 * ``identity_key(class, row=row, identity_token=token)`` 

448 

449 This form is similar to the class/tuple form, except is passed a 

450 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

451 

452 E.g.:: 

453 

454 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

455 >>> identity_key(MyClass, row=row) 

456 (<class '__main__.MyClass'>, (1, 2), None) 

457 

458 :param class: mapped class (must be a positional argument) 

459 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

460 (must be given as a keyword arg) 

461 :param identity_token: optional identity token 

462 

463 """ # noqa: E501 

464 if class_ is not None: 

465 mapper = class_mapper(class_) 

466 if row is None: 

467 if ident is None: 

468 raise sa_exc.ArgumentError("ident or row is required") 

469 return mapper.identity_key_from_primary_key( 

470 tuple(util.to_list(ident)), identity_token=identity_token 

471 ) 

472 else: 

473 return mapper.identity_key_from_row( 

474 row, identity_token=identity_token 

475 ) 

476 elif instance is not None: 

477 mapper = object_mapper(instance) 

478 return mapper.identity_key_from_instance(instance) 

479 else: 

480 raise sa_exc.ArgumentError("class or instance is required") 

481 

482 

483class _TraceAdaptRole(enum.Enum): 

484 """Enumeration of all the use cases for ORMAdapter. 

485 

486 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

487 used for in-place adaption of column expressions to be applied to a SELECT, 

488 replacing :class:`.Table` and other objects that are mapped to classes with 

489 aliases of those tables in the case of joined eager loading, or in the case 

490 of polymorphic loading as used with concrete mappings or other custom "with 

491 polymorphic" parameters, with whole user-defined subqueries. The 

492 enumerations provide an overview of all the use cases used by ORMAdapter, a 

493 layer of formality as to the introduction of new ORMAdapter use cases (of 

494 which none are anticipated), as well as a means to trace the origins of a 

495 particular ORMAdapter within runtime debugging. 

496 

497 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

498 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

499 method and the ``Query.select_from_entity()`` methods, favoring 

500 user-explicit aliasing schemes using the ``aliased()`` and 

501 ``with_polymorphic()`` standalone constructs; these still use adaption, 

502 however the adaption is applied in a narrower scope. 

503 

504 """ 

505 

506 # aliased() use that is used to adapt individual attributes at query 

507 # construction time 

508 ALIASED_INSP = enum.auto() 

509 

510 # joinedload cases; typically adapt an ON clause of a relationship 

511 # join 

512 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

513 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

514 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

515 

516 # polymorphic cases - these are complex ones that replace FROM 

517 # clauses, replacing tables with subqueries 

518 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

519 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

520 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

521 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

522 

523 # the from_statement() case, used only to adapt individual attributes 

524 # from a given statement to local ORM attributes at result fetching 

525 # time. assigned to ORMCompileState._from_obj_alias 

526 ADAPT_FROM_STATEMENT = enum.auto() 

527 

528 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

529 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

530 # joinedloads are then placed on the outside. 

531 # assigned to ORMCompileState.compound_eager_adapter 

532 COMPOUND_EAGER_STATEMENT = enum.auto() 

533 

534 # the legacy Query._set_select_from() case. 

535 # this is needed for Query's set operations (i.e. UNION, etc. ) 

536 # as well as "legacy from_self()", which while removed from 2.0 as 

537 # public API, is used for the Query.count() method. this one 

538 # still does full statement traversal 

539 # assigned to ORMCompileState._from_obj_alias 

540 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

541 

542 

543class ORMStatementAdapter(sql_util.ColumnAdapter): 

544 """ColumnAdapter which includes a role attribute.""" 

545 

546 __slots__ = ("role",) 

547 

548 def __init__( 

549 self, 

550 role: _TraceAdaptRole, 

551 selectable: Selectable, 

552 *, 

553 equivalents: Optional[_EquivalentColumnMap] = None, 

554 adapt_required: bool = False, 

555 allow_label_resolve: bool = True, 

556 anonymize_labels: bool = False, 

557 adapt_on_names: bool = False, 

558 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

559 ): 

560 self.role = role 

561 super().__init__( 

562 selectable, 

563 equivalents=equivalents, 

564 adapt_required=adapt_required, 

565 allow_label_resolve=allow_label_resolve, 

566 anonymize_labels=anonymize_labels, 

567 adapt_on_names=adapt_on_names, 

568 adapt_from_selectables=adapt_from_selectables, 

569 ) 

570 

571 

572class ORMAdapter(sql_util.ColumnAdapter): 

573 """ColumnAdapter subclass which excludes adaptation of entities from 

574 non-matching mappers. 

575 

576 """ 

577 

578 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

579 

580 is_aliased_class: bool 

581 aliased_insp: Optional[AliasedInsp[Any]] 

582 

583 def __init__( 

584 self, 

585 role: _TraceAdaptRole, 

586 entity: _InternalEntityType[Any], 

587 *, 

588 equivalents: Optional[_EquivalentColumnMap] = None, 

589 adapt_required: bool = False, 

590 allow_label_resolve: bool = True, 

591 anonymize_labels: bool = False, 

592 selectable: Optional[Selectable] = None, 

593 limit_on_entity: bool = True, 

594 adapt_on_names: bool = False, 

595 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

596 ): 

597 self.role = role 

598 self.mapper = entity.mapper 

599 if selectable is None: 

600 selectable = entity.selectable 

601 if insp_is_aliased_class(entity): 

602 self.is_aliased_class = True 

603 self.aliased_insp = entity 

604 else: 

605 self.is_aliased_class = False 

606 self.aliased_insp = None 

607 

608 super().__init__( 

609 selectable, 

610 equivalents, 

611 adapt_required=adapt_required, 

612 allow_label_resolve=allow_label_resolve, 

613 anonymize_labels=anonymize_labels, 

614 include_fn=self._include_fn if limit_on_entity else None, 

615 adapt_on_names=adapt_on_names, 

616 adapt_from_selectables=adapt_from_selectables, 

617 ) 

618 

619 def _include_fn(self, elem): 

620 entity = elem._annotations.get("parentmapper", None) 

621 

622 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

623 

624 

625class AliasedClass( 

626 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

627): 

628 r"""Represents an "aliased" form of a mapped class for usage with Query. 

629 

630 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

631 construct, this object mimics the mapped class using a 

632 ``__getattr__`` scheme and maintains a reference to a 

633 real :class:`~sqlalchemy.sql.expression.Alias` object. 

634 

635 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

636 within a SQL statement generated by the ORM, such that an existing 

637 mapped entity can be used in multiple contexts. A simple example:: 

638 

639 # find all pairs of users with the same name 

640 user_alias = aliased(User) 

641 session.query(User, user_alias).join( 

642 (user_alias, User.id > user_alias.id) 

643 ).filter(User.name == user_alias.name) 

644 

645 :class:`.AliasedClass` is also capable of mapping an existing mapped 

646 class to an entirely new selectable, provided this selectable is column- 

647 compatible with the existing mapped selectable, and it can also be 

648 configured in a mapping as the target of a :func:`_orm.relationship`. 

649 See the links below for examples. 

650 

651 The :class:`.AliasedClass` object is constructed typically using the 

652 :func:`_orm.aliased` function. It also is produced with additional 

653 configuration when using the :func:`_orm.with_polymorphic` function. 

654 

655 The resulting object is an instance of :class:`.AliasedClass`. 

656 This object implements an attribute scheme which produces the 

657 same attribute and method interface as the original mapped 

658 class, allowing :class:`.AliasedClass` to be compatible 

659 with any attribute technique which works on the original class, 

660 including hybrid attributes (see :ref:`hybrids_toplevel`). 

661 

662 The :class:`.AliasedClass` can be inspected for its underlying 

663 :class:`_orm.Mapper`, aliased selectable, and other information 

664 using :func:`_sa.inspect`:: 

665 

666 from sqlalchemy import inspect 

667 

668 my_alias = aliased(MyClass) 

669 insp = inspect(my_alias) 

670 

671 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

672 

673 

674 .. seealso:: 

675 

676 :func:`.aliased` 

677 

678 :func:`.with_polymorphic` 

679 

680 :ref:`relationship_aliased_class` 

681 

682 :ref:`relationship_to_window_function` 

683 

684 

685 """ 

686 

687 __name__: str 

688 

689 def __init__( 

690 self, 

691 mapped_class_or_ac: _EntityType[_O], 

692 alias: Optional[FromClause] = None, 

693 name: Optional[str] = None, 

694 flat: bool = False, 

695 adapt_on_names: bool = False, 

696 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

697 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

698 base_alias: Optional[AliasedInsp[Any]] = None, 

699 use_mapper_path: bool = False, 

700 represents_outer_join: bool = False, 

701 ): 

702 insp = cast( 

703 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

704 ) 

705 mapper = insp.mapper 

706 

707 nest_adapters = False 

708 

709 if alias is None: 

710 if insp.is_aliased_class and insp.selectable._is_subquery: 

711 alias = insp.selectable.alias() 

712 else: 

713 alias = ( 

714 mapper._with_polymorphic_selectable._anonymous_fromclause( 

715 name=name, 

716 flat=flat, 

717 ) 

718 ) 

719 elif insp.is_aliased_class: 

720 nest_adapters = True 

721 

722 assert alias is not None 

723 self._aliased_insp = AliasedInsp( 

724 self, 

725 insp, 

726 alias, 

727 name, 

728 ( 

729 with_polymorphic_mappers 

730 if with_polymorphic_mappers 

731 else mapper.with_polymorphic_mappers 

732 ), 

733 ( 

734 with_polymorphic_discriminator 

735 if with_polymorphic_discriminator is not None 

736 else mapper.polymorphic_on 

737 ), 

738 base_alias, 

739 use_mapper_path, 

740 adapt_on_names, 

741 represents_outer_join, 

742 nest_adapters, 

743 ) 

744 

745 self.__name__ = f"aliased({mapper.class_.__name__})" 

746 

747 @classmethod 

748 def _reconstitute_from_aliased_insp( 

749 cls, aliased_insp: AliasedInsp[_O] 

750 ) -> AliasedClass[_O]: 

751 obj = cls.__new__(cls) 

752 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

753 obj._aliased_insp = aliased_insp 

754 

755 if aliased_insp._is_with_polymorphic: 

756 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

757 if sub_aliased_insp is not aliased_insp: 

758 ent = AliasedClass._reconstitute_from_aliased_insp( 

759 sub_aliased_insp 

760 ) 

761 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

762 

763 return obj 

764 

765 def __getattr__(self, key: str) -> Any: 

766 try: 

767 _aliased_insp = self.__dict__["_aliased_insp"] 

768 except KeyError: 

769 raise AttributeError() 

770 else: 

771 target = _aliased_insp._target 

772 # maintain all getattr mechanics 

773 attr = getattr(target, key) 

774 

775 # attribute is a method, that will be invoked against a 

776 # "self"; so just return a new method with the same function and 

777 # new self 

778 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

779 return types.MethodType(attr.__func__, self) 

780 

781 # attribute is a descriptor, that will be invoked against a 

782 # "self"; so invoke the descriptor against this self 

783 if hasattr(attr, "__get__"): 

784 attr = attr.__get__(None, self) 

785 

786 # attributes within the QueryableAttribute system will want this 

787 # to be invoked so the object can be adapted 

788 if hasattr(attr, "adapt_to_entity"): 

789 attr = attr.adapt_to_entity(_aliased_insp) 

790 setattr(self, key, attr) 

791 

792 return attr 

793 

794 def _get_from_serialized( 

795 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

796 ) -> Any: 

797 # this method is only used in terms of the 

798 # sqlalchemy.ext.serializer extension 

799 attr = getattr(mapped_class, key) 

800 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

801 return types.MethodType(attr.__func__, self) 

802 

803 # attribute is a descriptor, that will be invoked against a 

804 # "self"; so invoke the descriptor against this self 

805 if hasattr(attr, "__get__"): 

806 attr = attr.__get__(None, self) 

807 

808 # attributes within the QueryableAttribute system will want this 

809 # to be invoked so the object can be adapted 

810 if hasattr(attr, "adapt_to_entity"): 

811 aliased_insp._weak_entity = weakref.ref(self) 

812 attr = attr.adapt_to_entity(aliased_insp) 

813 setattr(self, key, attr) 

814 

815 return attr 

816 

817 def __repr__(self) -> str: 

818 return "<AliasedClass at 0x%x; %s>" % ( 

819 id(self), 

820 self._aliased_insp._target.__name__, 

821 ) 

822 

823 def __str__(self) -> str: 

824 return str(self._aliased_insp) 

825 

826 

827@inspection._self_inspects 

828class AliasedInsp( 

829 ORMEntityColumnsClauseRole[_O], 

830 ORMFromClauseRole, 

831 HasCacheKey, 

832 InspectionAttr, 

833 MemoizedSlots, 

834 inspection.Inspectable["AliasedInsp[_O]"], 

835 Generic[_O], 

836): 

837 """Provide an inspection interface for an 

838 :class:`.AliasedClass` object. 

839 

840 The :class:`.AliasedInsp` object is returned 

841 given an :class:`.AliasedClass` using the 

842 :func:`_sa.inspect` function:: 

843 

844 from sqlalchemy import inspect 

845 from sqlalchemy.orm import aliased 

846 

847 my_alias = aliased(MyMappedClass) 

848 insp = inspect(my_alias) 

849 

850 Attributes on :class:`.AliasedInsp` 

851 include: 

852 

853 * ``entity`` - the :class:`.AliasedClass` represented. 

854 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

855 * ``selectable`` - the :class:`_expression.Alias` 

856 construct which ultimately 

857 represents an aliased :class:`_schema.Table` or 

858 :class:`_expression.Select` 

859 construct. 

860 * ``name`` - the name of the alias. Also is used as the attribute 

861 name when returned in a result tuple from :class:`_query.Query`. 

862 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

863 objects 

864 indicating all those mappers expressed in the select construct 

865 for the :class:`.AliasedClass`. 

866 * ``polymorphic_on`` - an alternate column or SQL expression which 

867 will be used as the "discriminator" for a polymorphic load. 

868 

869 .. seealso:: 

870 

871 :ref:`inspection_toplevel` 

872 

873 """ 

874 

875 __slots__ = ( 

876 "__weakref__", 

877 "_weak_entity", 

878 "mapper", 

879 "selectable", 

880 "name", 

881 "_adapt_on_names", 

882 "with_polymorphic_mappers", 

883 "polymorphic_on", 

884 "_use_mapper_path", 

885 "_base_alias", 

886 "represents_outer_join", 

887 "persist_selectable", 

888 "local_table", 

889 "_is_with_polymorphic", 

890 "_with_polymorphic_entities", 

891 "_adapter", 

892 "_target", 

893 "__clause_element__", 

894 "_memoized_values", 

895 "_all_column_expressions", 

896 "_nest_adapters", 

897 ) 

898 

899 _cache_key_traversal = [ 

900 ("name", visitors.ExtendedInternalTraversal.dp_string), 

901 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

902 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

903 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

904 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

905 ( 

906 "with_polymorphic_mappers", 

907 visitors.InternalTraversal.dp_has_cache_key_list, 

908 ), 

909 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

910 ] 

911 

912 mapper: Mapper[_O] 

913 selectable: FromClause 

914 _adapter: ORMAdapter 

915 with_polymorphic_mappers: Sequence[Mapper[Any]] 

916 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

917 

918 _weak_entity: weakref.ref[AliasedClass[_O]] 

919 """the AliasedClass that refers to this AliasedInsp""" 

920 

921 _target: Union[Type[_O], AliasedClass[_O]] 

922 """the thing referenced by the AliasedClass/AliasedInsp. 

923 

924 In the vast majority of cases, this is the mapped class. However 

925 it may also be another AliasedClass (alias of alias). 

926 

927 """ 

928 

929 def __init__( 

930 self, 

931 entity: AliasedClass[_O], 

932 inspected: _InternalEntityType[_O], 

933 selectable: FromClause, 

934 name: Optional[str], 

935 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

936 polymorphic_on: Optional[ColumnElement[Any]], 

937 _base_alias: Optional[AliasedInsp[Any]], 

938 _use_mapper_path: bool, 

939 adapt_on_names: bool, 

940 represents_outer_join: bool, 

941 nest_adapters: bool, 

942 ): 

943 mapped_class_or_ac = inspected.entity 

944 mapper = inspected.mapper 

945 

946 self._weak_entity = weakref.ref(entity) 

947 self.mapper = mapper 

948 self.selectable = self.persist_selectable = self.local_table = ( 

949 selectable 

950 ) 

951 self.name = name 

952 self.polymorphic_on = polymorphic_on 

953 self._base_alias = weakref.ref(_base_alias or self) 

954 self._use_mapper_path = _use_mapper_path 

955 self.represents_outer_join = represents_outer_join 

956 self._nest_adapters = nest_adapters 

957 

958 if with_polymorphic_mappers: 

959 self._is_with_polymorphic = True 

960 self.with_polymorphic_mappers = with_polymorphic_mappers 

961 self._with_polymorphic_entities = [] 

962 for poly in self.with_polymorphic_mappers: 

963 if poly is not mapper: 

964 ent = AliasedClass( 

965 poly.class_, 

966 selectable, 

967 base_alias=self, 

968 adapt_on_names=adapt_on_names, 

969 use_mapper_path=_use_mapper_path, 

970 ) 

971 

972 setattr(self.entity, poly.class_.__name__, ent) 

973 self._with_polymorphic_entities.append(ent._aliased_insp) 

974 

975 else: 

976 self._is_with_polymorphic = False 

977 self.with_polymorphic_mappers = [mapper] 

978 

979 self._adapter = ORMAdapter( 

980 _TraceAdaptRole.ALIASED_INSP, 

981 mapper, 

982 selectable=selectable, 

983 equivalents=mapper._equivalent_columns, 

984 adapt_on_names=adapt_on_names, 

985 anonymize_labels=True, 

986 # make sure the adapter doesn't try to grab other tables that 

987 # are not even the thing we are mapping, such as embedded 

988 # selectables in subqueries or CTEs. See issue #6060 

989 adapt_from_selectables={ 

990 m.selectable 

991 for m in self.with_polymorphic_mappers 

992 if not adapt_on_names 

993 }, 

994 limit_on_entity=False, 

995 ) 

996 

997 if nest_adapters: 

998 # supports "aliased class of aliased class" use case 

999 assert isinstance(inspected, AliasedInsp) 

1000 self._adapter = inspected._adapter.wrap(self._adapter) 

1001 

1002 self._adapt_on_names = adapt_on_names 

1003 self._target = mapped_class_or_ac 

1004 

1005 @classmethod 

1006 def _alias_factory( 

1007 cls, 

1008 element: Union[_EntityType[_O], FromClause], 

1009 alias: Optional[FromClause] = None, 

1010 name: Optional[str] = None, 

1011 flat: bool = False, 

1012 adapt_on_names: bool = False, 

1013 ) -> Union[AliasedClass[_O], FromClause]: 

1014 if isinstance(element, FromClause): 

1015 if adapt_on_names: 

1016 raise sa_exc.ArgumentError( 

1017 "adapt_on_names only applies to ORM elements" 

1018 ) 

1019 if name: 

1020 return element.alias(name=name, flat=flat) 

1021 else: 

1022 return coercions.expect( 

1023 roles.AnonymizedFromClauseRole, element, flat=flat 

1024 ) 

1025 else: 

1026 return AliasedClass( 

1027 element, 

1028 alias=alias, 

1029 flat=flat, 

1030 name=name, 

1031 adapt_on_names=adapt_on_names, 

1032 ) 

1033 

1034 @classmethod 

1035 def _with_polymorphic_factory( 

1036 cls, 

1037 base: Union[Type[_O], Mapper[_O]], 

1038 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1039 selectable: Union[Literal[False, None], FromClause] = False, 

1040 flat: bool = False, 

1041 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1042 aliased: bool = False, 

1043 innerjoin: bool = False, 

1044 adapt_on_names: bool = False, 

1045 name: Optional[str] = None, 

1046 _use_mapper_path: bool = False, 

1047 ) -> AliasedClass[_O]: 

1048 primary_mapper = _class_to_mapper(base) 

1049 

1050 if selectable not in (None, False) and flat: 

1051 raise sa_exc.ArgumentError( 

1052 "the 'flat' and 'selectable' arguments cannot be passed " 

1053 "simultaneously to with_polymorphic()" 

1054 ) 

1055 

1056 mappers, selectable = primary_mapper._with_polymorphic_args( 

1057 classes, selectable, innerjoin=innerjoin 

1058 ) 

1059 if aliased or flat: 

1060 assert selectable is not None 

1061 selectable = selectable._anonymous_fromclause(flat=flat) 

1062 

1063 return AliasedClass( 

1064 base, 

1065 selectable, 

1066 name=name, 

1067 with_polymorphic_mappers=mappers, 

1068 adapt_on_names=adapt_on_names, 

1069 with_polymorphic_discriminator=polymorphic_on, 

1070 use_mapper_path=_use_mapper_path, 

1071 represents_outer_join=not innerjoin, 

1072 ) 

1073 

1074 @property 

1075 def entity(self) -> AliasedClass[_O]: 

1076 # to eliminate reference cycles, the AliasedClass is held weakly. 

1077 # this produces some situations where the AliasedClass gets lost, 

1078 # particularly when one is created internally and only the AliasedInsp 

1079 # is passed around. 

1080 # to work around this case, we just generate a new one when we need 

1081 # it, as it is a simple class with very little initial state on it. 

1082 ent = self._weak_entity() 

1083 if ent is None: 

1084 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1085 self._weak_entity = weakref.ref(ent) 

1086 return ent 

1087 

1088 is_aliased_class = True 

1089 "always returns True" 

1090 

1091 def _memoized_method___clause_element__(self) -> FromClause: 

1092 return self.selectable._annotate( 

1093 { 

1094 "parentmapper": self.mapper, 

1095 "parententity": self, 

1096 "entity_namespace": self, 

1097 } 

1098 )._set_propagate_attrs( 

1099 {"compile_state_plugin": "orm", "plugin_subject": self} 

1100 ) 

1101 

1102 @property 

1103 def entity_namespace(self) -> AliasedClass[_O]: 

1104 return self.entity 

1105 

1106 @property 

1107 def class_(self) -> Type[_O]: 

1108 """Return the mapped class ultimately represented by this 

1109 :class:`.AliasedInsp`.""" 

1110 return self.mapper.class_ 

1111 

1112 @property 

1113 def _path_registry(self) -> _AbstractEntityRegistry: 

1114 if self._use_mapper_path: 

1115 return self.mapper._path_registry 

1116 else: 

1117 return PathRegistry.per_mapper(self) 

1118 

1119 def __getstate__(self) -> Dict[str, Any]: 

1120 return { 

1121 "entity": self.entity, 

1122 "mapper": self.mapper, 

1123 "alias": self.selectable, 

1124 "name": self.name, 

1125 "adapt_on_names": self._adapt_on_names, 

1126 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1127 "with_polymorphic_discriminator": self.polymorphic_on, 

1128 "base_alias": self._base_alias(), 

1129 "use_mapper_path": self._use_mapper_path, 

1130 "represents_outer_join": self.represents_outer_join, 

1131 "nest_adapters": self._nest_adapters, 

1132 } 

1133 

1134 def __setstate__(self, state: Dict[str, Any]) -> None: 

1135 self.__init__( # type: ignore 

1136 state["entity"], 

1137 state["mapper"], 

1138 state["alias"], 

1139 state["name"], 

1140 state["with_polymorphic_mappers"], 

1141 state["with_polymorphic_discriminator"], 

1142 state["base_alias"], 

1143 state["use_mapper_path"], 

1144 state["adapt_on_names"], 

1145 state["represents_outer_join"], 

1146 state["nest_adapters"], 

1147 ) 

1148 

1149 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1150 # assert self._is_with_polymorphic 

1151 # assert other._is_with_polymorphic 

1152 

1153 primary_mapper = other.mapper 

1154 

1155 assert self.mapper is primary_mapper 

1156 

1157 our_classes = util.to_set( 

1158 mp.class_ for mp in self.with_polymorphic_mappers 

1159 ) 

1160 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1161 if our_classes == new_classes: 

1162 return other 

1163 else: 

1164 classes = our_classes.union(new_classes) 

1165 

1166 mappers, selectable = primary_mapper._with_polymorphic_args( 

1167 classes, None, innerjoin=not other.represents_outer_join 

1168 ) 

1169 selectable = selectable._anonymous_fromclause(flat=True) 

1170 return AliasedClass( 

1171 primary_mapper, 

1172 selectable, 

1173 with_polymorphic_mappers=mappers, 

1174 with_polymorphic_discriminator=other.polymorphic_on, 

1175 use_mapper_path=other._use_mapper_path, 

1176 represents_outer_join=other.represents_outer_join, 

1177 )._aliased_insp 

1178 

1179 def _adapt_element( 

1180 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1181 ) -> _ORMCOLEXPR: 

1182 assert isinstance(expr, ColumnElement) 

1183 d: Dict[str, Any] = { 

1184 "parententity": self, 

1185 "parentmapper": self.mapper, 

1186 } 

1187 if key: 

1188 d["proxy_key"] = key 

1189 

1190 # IMO mypy should see this one also as returning the same type 

1191 # we put into it, but it's not 

1192 return ( 

1193 self._adapter.traverse(expr) 

1194 ._annotate(d) 

1195 ._set_propagate_attrs( 

1196 {"compile_state_plugin": "orm", "plugin_subject": self} 

1197 ) 

1198 ) 

1199 

1200 if TYPE_CHECKING: 

1201 # establish compatibility with the _ORMAdapterProto protocol, 

1202 # which in turn is compatible with _CoreAdapterProto. 

1203 

1204 def _orm_adapt_element( 

1205 self, 

1206 obj: _CE, 

1207 key: Optional[str] = None, 

1208 ) -> _CE: ... 

1209 

1210 else: 

1211 _orm_adapt_element = _adapt_element 

1212 

1213 def _entity_for_mapper(self, mapper): 

1214 self_poly = self.with_polymorphic_mappers 

1215 if mapper in self_poly: 

1216 if mapper is self.mapper: 

1217 return self 

1218 else: 

1219 return getattr( 

1220 self.entity, mapper.class_.__name__ 

1221 )._aliased_insp 

1222 elif mapper.isa(self.mapper): 

1223 return self 

1224 else: 

1225 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1226 

1227 def _memoized_attr__get_clause(self): 

1228 onclause, replacemap = self.mapper._get_clause 

1229 return ( 

1230 self._adapter.traverse(onclause), 

1231 { 

1232 self._adapter.traverse(col): param 

1233 for col, param in replacemap.items() 

1234 }, 

1235 ) 

1236 

1237 def _memoized_attr__memoized_values(self): 

1238 return {} 

1239 

1240 def _memoized_attr__all_column_expressions(self): 

1241 if self._is_with_polymorphic: 

1242 cols_plus_keys = self.mapper._columns_plus_keys( 

1243 [ent.mapper for ent in self._with_polymorphic_entities] 

1244 ) 

1245 else: 

1246 cols_plus_keys = self.mapper._columns_plus_keys() 

1247 

1248 cols_plus_keys = [ 

1249 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1250 ] 

1251 

1252 return ColumnCollection(cols_plus_keys) 

1253 

1254 def _memo(self, key, callable_, *args, **kw): 

1255 if key in self._memoized_values: 

1256 return self._memoized_values[key] 

1257 else: 

1258 self._memoized_values[key] = value = callable_(*args, **kw) 

1259 return value 

1260 

1261 def __repr__(self): 

1262 if self.with_polymorphic_mappers: 

1263 with_poly = "(%s)" % ", ".join( 

1264 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1265 ) 

1266 else: 

1267 with_poly = "" 

1268 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1269 id(self), 

1270 self.class_.__name__, 

1271 with_poly, 

1272 ) 

1273 

1274 def __str__(self): 

1275 if self._is_with_polymorphic: 

1276 return "with_polymorphic(%s, [%s])" % ( 

1277 self._target.__name__, 

1278 ", ".join( 

1279 mp.class_.__name__ 

1280 for mp in self.with_polymorphic_mappers 

1281 if mp is not self.mapper 

1282 ), 

1283 ) 

1284 else: 

1285 return "aliased(%s)" % (self._target.__name__,) 

1286 

1287 

1288class _WrapUserEntity: 

1289 """A wrapper used within the loader_criteria lambda caller so that 

1290 we can bypass declared_attr descriptors on unmapped mixins, which 

1291 normally emit a warning for such use. 

1292 

1293 might also be useful for other per-lambda instrumentations should 

1294 the need arise. 

1295 

1296 """ 

1297 

1298 __slots__ = ("subject",) 

1299 

1300 def __init__(self, subject): 

1301 self.subject = subject 

1302 

1303 @util.preload_module("sqlalchemy.orm.decl_api") 

1304 def __getattribute__(self, name): 

1305 decl_api = util.preloaded.orm.decl_api 

1306 

1307 subject = object.__getattribute__(self, "subject") 

1308 if name in subject.__dict__ and isinstance( 

1309 subject.__dict__[name], decl_api.declared_attr 

1310 ): 

1311 return subject.__dict__[name].fget(subject) 

1312 else: 

1313 return getattr(subject, name) 

1314 

1315 

1316class LoaderCriteriaOption(CriteriaOption): 

1317 """Add additional WHERE criteria to the load for all occurrences of 

1318 a particular entity. 

1319 

1320 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1321 :func:`_orm.with_loader_criteria` function; see that function for 

1322 details. 

1323 

1324 .. versionadded:: 1.4 

1325 

1326 """ 

1327 

1328 __slots__ = ( 

1329 "root_entity", 

1330 "entity", 

1331 "deferred_where_criteria", 

1332 "where_criteria", 

1333 "_where_crit_orig", 

1334 "include_aliases", 

1335 "propagate_to_loaders", 

1336 ) 

1337 

1338 _traverse_internals = [ 

1339 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1340 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1341 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1342 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1343 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1344 ] 

1345 

1346 root_entity: Optional[Type[Any]] 

1347 entity: Optional[_InternalEntityType[Any]] 

1348 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1349 deferred_where_criteria: bool 

1350 include_aliases: bool 

1351 propagate_to_loaders: bool 

1352 

1353 _where_crit_orig: Any 

1354 

1355 def __init__( 

1356 self, 

1357 entity_or_base: _EntityType[Any], 

1358 where_criteria: Union[ 

1359 _ColumnExpressionArgument[bool], 

1360 Callable[[Any], _ColumnExpressionArgument[bool]], 

1361 ], 

1362 loader_only: bool = False, 

1363 include_aliases: bool = False, 

1364 propagate_to_loaders: bool = True, 

1365 track_closure_variables: bool = True, 

1366 ): 

1367 entity = cast( 

1368 "_InternalEntityType[Any]", 

1369 inspection.inspect(entity_or_base, False), 

1370 ) 

1371 if entity is None: 

1372 self.root_entity = cast("Type[Any]", entity_or_base) 

1373 self.entity = None 

1374 else: 

1375 self.root_entity = None 

1376 self.entity = entity 

1377 

1378 self._where_crit_orig = where_criteria 

1379 if callable(where_criteria): 

1380 if self.root_entity is not None: 

1381 wrap_entity = self.root_entity 

1382 else: 

1383 assert entity is not None 

1384 wrap_entity = entity.entity 

1385 

1386 self.deferred_where_criteria = True 

1387 self.where_criteria = lambdas.DeferredLambdaElement( 

1388 where_criteria, 

1389 roles.WhereHavingRole, 

1390 lambda_args=(_WrapUserEntity(wrap_entity),), 

1391 opts=lambdas.LambdaOptions( 

1392 track_closure_variables=track_closure_variables 

1393 ), 

1394 ) 

1395 else: 

1396 self.deferred_where_criteria = False 

1397 self.where_criteria = coercions.expect( 

1398 roles.WhereHavingRole, where_criteria 

1399 ) 

1400 

1401 self.include_aliases = include_aliases 

1402 self.propagate_to_loaders = propagate_to_loaders 

1403 

1404 @classmethod 

1405 def _unreduce( 

1406 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1407 ): 

1408 return LoaderCriteriaOption( 

1409 entity, 

1410 where_criteria, 

1411 include_aliases=include_aliases, 

1412 propagate_to_loaders=propagate_to_loaders, 

1413 ) 

1414 

1415 def __reduce__(self): 

1416 return ( 

1417 LoaderCriteriaOption._unreduce, 

1418 ( 

1419 self.entity.class_ if self.entity else self.root_entity, 

1420 self._where_crit_orig, 

1421 self.include_aliases, 

1422 self.propagate_to_loaders, 

1423 ), 

1424 ) 

1425 

1426 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1427 if self.entity: 

1428 yield from self.entity.mapper.self_and_descendants 

1429 else: 

1430 assert self.root_entity 

1431 stack = list(self.root_entity.__subclasses__()) 

1432 while stack: 

1433 subclass = stack.pop(0) 

1434 ent = cast( 

1435 "_InternalEntityType[Any]", 

1436 inspection.inspect(subclass, raiseerr=False), 

1437 ) 

1438 if ent: 

1439 yield from ent.mapper.self_and_descendants 

1440 else: 

1441 stack.extend(subclass.__subclasses__()) 

1442 

1443 def _should_include(self, compile_state: _ORMCompileState) -> bool: 

1444 if ( 

1445 compile_state.select_statement._annotations.get( 

1446 "for_loader_criteria", None 

1447 ) 

1448 is self 

1449 ): 

1450 return False 

1451 return True 

1452 

1453 def _resolve_where_criteria( 

1454 self, ext_info: _InternalEntityType[Any] 

1455 ) -> ColumnElement[bool]: 

1456 if self.deferred_where_criteria: 

1457 crit = cast( 

1458 "ColumnElement[bool]", 

1459 self.where_criteria._resolve_with_args(ext_info.entity), 

1460 ) 

1461 else: 

1462 crit = self.where_criteria # type: ignore 

1463 assert isinstance(crit, ColumnElement) 

1464 return sql_util._deep_annotate( 

1465 crit, 

1466 {"for_loader_criteria": self}, 

1467 detect_subquery_cols=True, 

1468 ind_cols_on_fromclause=True, 

1469 ) 

1470 

1471 def process_compile_state_replaced_entities( 

1472 self, 

1473 compile_state: _ORMCompileState, 

1474 mapper_entities: Iterable[_MapperEntity], 

1475 ) -> None: 

1476 self.process_compile_state(compile_state) 

1477 

1478 def process_compile_state(self, compile_state: _ORMCompileState) -> None: 

1479 """Apply a modification to a given :class:`.CompileState`.""" 

1480 

1481 # if options to limit the criteria to immediate query only, 

1482 # use compile_state.attributes instead 

1483 

1484 self.get_global_criteria(compile_state.global_attributes) 

1485 

1486 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1487 for mp in self._all_mappers(): 

1488 load_criteria = attributes.setdefault( 

1489 ("additional_entity_criteria", mp), [] 

1490 ) 

1491 

1492 load_criteria.append(self) 

1493 

1494 

1495inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1496 

1497 

1498@inspection._inspects(type) 

1499def _inspect_mc( 

1500 class_: Type[_O], 

1501) -> Optional[Mapper[_O]]: 

1502 try: 

1503 class_manager = opt_manager_of_class(class_) 

1504 if class_manager is None or not class_manager.is_mapped: 

1505 return None 

1506 mapper = class_manager.mapper 

1507 except exc.NO_STATE: 

1508 return None 

1509 else: 

1510 return mapper 

1511 

1512 

1513GenericAlias = type(List[Any]) 

1514 

1515 

1516@inspection._inspects(GenericAlias) 

1517def _inspect_generic_alias( 

1518 class_: Type[_O], 

1519) -> Optional[Mapper[_O]]: 

1520 origin = cast("Type[_O]", get_origin(class_)) 

1521 return _inspect_mc(origin) 

1522 

1523 

1524@inspection._self_inspects 

1525class Bundle( 

1526 ORMColumnsClauseRole[_T], 

1527 SupportsCloneAnnotations, 

1528 MemoizedHasCacheKey, 

1529 inspection.Inspectable["Bundle[_T]"], 

1530 InspectionAttr, 

1531): 

1532 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1533 under one namespace. 

1534 

1535 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1536 results returned by a column-oriented :class:`_query.Query` object. 

1537 It also 

1538 is extensible via simple subclassing, where the primary capability 

1539 to override is that of how the set of expressions should be returned, 

1540 allowing post-processing as well as custom return types, without 

1541 involving ORM identity-mapped classes. 

1542 

1543 .. seealso:: 

1544 

1545 :ref:`bundles` 

1546 

1547 

1548 """ 

1549 

1550 single_entity = False 

1551 """If True, queries for a single Bundle will be returned as a single 

1552 entity, rather than an element within a keyed tuple.""" 

1553 

1554 is_clause_element = False 

1555 

1556 is_mapper = False 

1557 

1558 is_aliased_class = False 

1559 

1560 is_bundle = True 

1561 

1562 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1563 

1564 proxy_set = util.EMPTY_SET 

1565 

1566 exprs: List[_ColumnsClauseElement] 

1567 

1568 def __init__( 

1569 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1570 ): 

1571 r"""Construct a new :class:`.Bundle`. 

1572 

1573 e.g.:: 

1574 

1575 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1576 

1577 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1578 print(row.mybundle.x, row.mybundle.y) 

1579 

1580 :param name: name of the bundle. 

1581 :param \*exprs: columns or SQL expressions comprising the bundle. 

1582 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1583 can be returned as a "single entity" outside of any enclosing tuple 

1584 in the same manner as a mapped entity. 

1585 

1586 """ # noqa: E501 

1587 self.name = self._label = name 

1588 coerced_exprs = [ 

1589 coercions.expect( 

1590 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1591 ) 

1592 for expr in exprs 

1593 ] 

1594 self.exprs = coerced_exprs 

1595 

1596 self.c = self.columns = ColumnCollection( 

1597 (getattr(col, "key", col._label), col) 

1598 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1599 ).as_readonly() 

1600 self.single_entity = kw.pop("single_entity", self.single_entity) 

1601 

1602 def _gen_cache_key( 

1603 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1604 ) -> Tuple[Any, ...]: 

1605 return (self.__class__, self.name, self.single_entity) + tuple( 

1606 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1607 ) 

1608 

1609 @property 

1610 def mapper(self) -> Optional[Mapper[Any]]: 

1611 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1612 "parentmapper", None 

1613 ) 

1614 return mp 

1615 

1616 @property 

1617 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1618 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1619 0 

1620 ]._annotations.get("parententity", None) 

1621 return ie 

1622 

1623 @property 

1624 def entity_namespace( 

1625 self, 

1626 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1627 return self.c 

1628 

1629 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1630 

1631 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1632 

1633 e.g.:: 

1634 

1635 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1636 

1637 q = sess.query(bn).filter(bn.c.x == 5) 

1638 

1639 Nesting of bundles is also supported:: 

1640 

1641 b1 = Bundle( 

1642 "b1", 

1643 Bundle("b2", MyClass.a, MyClass.b), 

1644 Bundle("b3", MyClass.x, MyClass.y), 

1645 ) 

1646 

1647 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1648 

1649 .. seealso:: 

1650 

1651 :attr:`.Bundle.c` 

1652 

1653 """ # noqa: E501 

1654 

1655 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1656 """An alias for :attr:`.Bundle.columns`.""" 

1657 

1658 def _clone(self, **kw): 

1659 cloned = self.__class__.__new__(self.__class__) 

1660 cloned.__dict__.update(self.__dict__) 

1661 return cloned 

1662 

1663 def __clause_element__(self): 

1664 # ensure existing entity_namespace remains 

1665 annotations = {"bundle": self, "entity_namespace": self} 

1666 annotations.update(self._annotations) 

1667 

1668 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1669 "plugin_subject", self.entity 

1670 ) 

1671 return ( 

1672 expression.ClauseList( 

1673 _literal_as_text_role=roles.ColumnsClauseRole, 

1674 group=False, 

1675 *[e._annotations.get("bundle", e) for e in self.exprs], 

1676 ) 

1677 ._annotate(annotations) 

1678 ._set_propagate_attrs( 

1679 # the Bundle *must* use the orm plugin no matter what. the 

1680 # subject can be None but it's much better if it's not. 

1681 { 

1682 "compile_state_plugin": "orm", 

1683 "plugin_subject": plugin_subject, 

1684 } 

1685 ) 

1686 ) 

1687 

1688 @property 

1689 def clauses(self): 

1690 return self.__clause_element__().clauses 

1691 

1692 def label(self, name): 

1693 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1694 

1695 cloned = self._clone() 

1696 cloned.name = name 

1697 return cloned 

1698 

1699 def create_row_processor( 

1700 self, 

1701 query: Select[Unpack[TupleAny]], 

1702 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1703 labels: Sequence[str], 

1704 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1705 """Produce the "row processing" function for this :class:`.Bundle`. 

1706 

1707 May be overridden by subclasses to provide custom behaviors when 

1708 results are fetched. The method is passed the statement object and a 

1709 set of "row processor" functions at query execution time; these 

1710 processor functions when given a result row will return the individual 

1711 attribute value, which can then be adapted into any kind of return data 

1712 structure. 

1713 

1714 The example below illustrates replacing the usual :class:`.Row` 

1715 return structure with a straight Python dictionary:: 

1716 

1717 from sqlalchemy.orm import Bundle 

1718 

1719 

1720 class DictBundle(Bundle): 

1721 def create_row_processor(self, query, procs, labels): 

1722 "Override create_row_processor to return values as dictionaries" 

1723 

1724 def proc(row): 

1725 return dict(zip(labels, (proc(row) for proc in procs))) 

1726 

1727 return proc 

1728 

1729 A result from the above :class:`_orm.Bundle` will return dictionary 

1730 values:: 

1731 

1732 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1733 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1734 print(row.mybundle["data1"], row.mybundle["data2"]) 

1735 

1736 """ # noqa: E501 

1737 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1738 

1739 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1740 return keyed_tuple([proc(row) for proc in procs]) 

1741 

1742 return proc 

1743 

1744 

1745def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: 

1746 """Deep copy the given ClauseElement, annotating each element with the 

1747 "_orm_adapt" flag. 

1748 

1749 Elements within the exclude collection will be cloned but not annotated. 

1750 

1751 """ 

1752 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) 

1753 

1754 

1755def _orm_deannotate(element: _SA) -> _SA: 

1756 """Remove annotations that link a column to a particular mapping. 

1757 

1758 Note this doesn't affect "remote" and "foreign" annotations 

1759 passed by the :func:`_orm.foreign` and :func:`_orm.remote` 

1760 annotators. 

1761 

1762 """ 

1763 

1764 return sql_util._deep_deannotate( 

1765 element, values=("_orm_adapt", "parententity") 

1766 ) 

1767 

1768 

1769def _orm_full_deannotate(element: _SA) -> _SA: 

1770 return sql_util._deep_deannotate(element) 

1771 

1772 

1773class _ORMJoin(expression.Join): 

1774 """Extend Join to support ORM constructs as input.""" 

1775 

1776 __visit_name__ = expression.Join.__visit_name__ 

1777 

1778 inherit_cache = True 

1779 

1780 def __init__( 

1781 self, 

1782 left: _FromClauseArgument, 

1783 right: _FromClauseArgument, 

1784 onclause: Optional[_OnClauseArgument] = None, 

1785 isouter: bool = False, 

1786 full: bool = False, 

1787 _left_memo: Optional[Any] = None, 

1788 _right_memo: Optional[Any] = None, 

1789 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1790 ): 

1791 left_info = cast( 

1792 "Union[FromClause, _InternalEntityType[Any]]", 

1793 inspection.inspect(left), 

1794 ) 

1795 

1796 right_info = cast( 

1797 "Union[FromClause, _InternalEntityType[Any]]", 

1798 inspection.inspect(right), 

1799 ) 

1800 adapt_to = right_info.selectable 

1801 

1802 # used by joined eager loader 

1803 self._left_memo = _left_memo 

1804 self._right_memo = _right_memo 

1805 

1806 if isinstance(onclause, attributes.QueryableAttribute): 

1807 if TYPE_CHECKING: 

1808 assert isinstance( 

1809 onclause.comparator, RelationshipProperty.Comparator 

1810 ) 

1811 on_selectable = onclause.comparator._source_selectable() 

1812 prop = onclause.property 

1813 _extra_criteria += onclause._extra_criteria 

1814 elif isinstance(onclause, MapperProperty): 

1815 # used internally by joined eager loader...possibly not ideal 

1816 prop = onclause 

1817 on_selectable = prop.parent.selectable 

1818 else: 

1819 prop = None 

1820 on_selectable = None 

1821 

1822 left_selectable = left_info.selectable 

1823 if prop: 

1824 adapt_from: Optional[FromClause] 

1825 if sql_util.clause_is_present(on_selectable, left_selectable): 

1826 adapt_from = on_selectable 

1827 else: 

1828 assert isinstance(left_selectable, FromClause) 

1829 adapt_from = left_selectable 

1830 

1831 ( 

1832 pj, 

1833 sj, 

1834 source, 

1835 dest, 

1836 secondary, 

1837 target_adapter, 

1838 ) = prop._create_joins( 

1839 source_selectable=adapt_from, 

1840 dest_selectable=adapt_to, 

1841 source_polymorphic=True, 

1842 of_type_entity=right_info, 

1843 alias_secondary=True, 

1844 extra_criteria=_extra_criteria, 

1845 ) 

1846 

1847 if sj is not None: 

1848 if isouter: 

1849 # note this is an inner join from secondary->right 

1850 right = sql.join(secondary, right, sj) 

1851 onclause = pj 

1852 else: 

1853 left = sql.join(left, secondary, pj, isouter) 

1854 onclause = sj 

1855 else: 

1856 onclause = pj 

1857 

1858 self._target_adapter = target_adapter 

1859 

1860 # we don't use the normal coercions logic for _ORMJoin 

1861 # (probably should), so do some gymnastics to get the entity. 

1862 # logic here is for #8721, which was a major bug in 1.4 

1863 # for almost two years, not reported/fixed until 1.4.43 (!) 

1864 if is_selectable(left_info): 

1865 parententity = left_selectable._annotations.get( 

1866 "parententity", None 

1867 ) 

1868 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1869 parententity = left_info 

1870 else: 

1871 parententity = None 

1872 

1873 if parententity is not None: 

1874 self._annotations = self._annotations.union( 

1875 {"parententity": parententity} 

1876 ) 

1877 

1878 augment_onclause = bool(_extra_criteria) and not prop 

1879 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1880 

1881 assert self.onclause is not None 

1882 

1883 if augment_onclause: 

1884 self.onclause &= sql.and_(*_extra_criteria) 

1885 

1886 if ( 

1887 not prop 

1888 and getattr(right_info, "mapper", None) 

1889 and right_info.mapper.single # type: ignore 

1890 ): 

1891 right_info = cast("_InternalEntityType[Any]", right_info) 

1892 # if single inheritance target and we are using a manual 

1893 # or implicit ON clause, augment it the same way we'd augment the 

1894 # WHERE. 

1895 single_crit = right_info.mapper._single_table_criterion 

1896 if single_crit is not None: 

1897 if insp_is_aliased_class(right_info): 

1898 single_crit = right_info._adapter.traverse(single_crit) 

1899 self.onclause = self.onclause & single_crit 

1900 

1901 def _splice_into_center(self, other): 

1902 """Splice a join into the center. 

1903 

1904 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1905 

1906 """ 

1907 leftmost = other 

1908 while isinstance(leftmost, sql.Join): 

1909 leftmost = leftmost.left 

1910 

1911 assert self.right is leftmost 

1912 

1913 left = _ORMJoin( 

1914 self.left, 

1915 other.left, 

1916 self.onclause, 

1917 isouter=self.isouter, 

1918 _left_memo=self._left_memo, 

1919 _right_memo=other._left_memo._path_registry, 

1920 ) 

1921 

1922 return _ORMJoin( 

1923 left, 

1924 other.right, 

1925 other.onclause, 

1926 isouter=other.isouter, 

1927 _right_memo=other._right_memo, 

1928 ) 

1929 

1930 def join( 

1931 self, 

1932 right: _FromClauseArgument, 

1933 onclause: Optional[_OnClauseArgument] = None, 

1934 isouter: bool = False, 

1935 full: bool = False, 

1936 ) -> _ORMJoin: 

1937 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1938 

1939 def outerjoin( 

1940 self, 

1941 right: _FromClauseArgument, 

1942 onclause: Optional[_OnClauseArgument] = None, 

1943 full: bool = False, 

1944 ) -> _ORMJoin: 

1945 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1946 

1947 

1948def with_parent( 

1949 instance: object, 

1950 prop: attributes.QueryableAttribute[Any], 

1951 from_entity: Optional[_EntityType[Any]] = None, 

1952) -> ColumnElement[bool]: 

1953 """Create filtering criterion that relates this query's primary entity 

1954 to the given related instance, using established 

1955 :func:`_orm.relationship()` 

1956 configuration. 

1957 

1958 E.g.:: 

1959 

1960 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1961 

1962 The SQL rendered is the same as that rendered when a lazy loader 

1963 would fire off from the given parent on that attribute, meaning 

1964 that the appropriate state is taken from the parent object in 

1965 Python without the need to render joins to the parent table 

1966 in the rendered statement. 

1967 

1968 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

1969 to indicate the left side of the criteria:: 

1970 

1971 

1972 a1 = aliased(Address) 

1973 a2 = aliased(Address) 

1974 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

1975 

1976 The above use is equivalent to using the 

1977 :func:`_orm.with_parent.from_entity` argument:: 

1978 

1979 a1 = aliased(Address) 

1980 a2 = aliased(Address) 

1981 stmt = select(a1, a2).where( 

1982 with_parent(u1, User.addresses, from_entity=a2) 

1983 ) 

1984 

1985 :param instance: 

1986 An instance which has some :func:`_orm.relationship`. 

1987 

1988 :param property: 

1989 Class-bound attribute, which indicates 

1990 what relationship from the instance should be used to reconcile the 

1991 parent/child relationship. 

1992 

1993 :param from_entity: 

1994 Entity in which to consider as the left side. This defaults to the 

1995 "zero" entity of the :class:`_query.Query` itself. 

1996 

1997 """ # noqa: E501 

1998 prop_t: RelationshipProperty[Any] 

1999 

2000 if isinstance(prop, str): 

2001 raise sa_exc.ArgumentError( 

2002 "with_parent() accepts class-bound mapped attributes, not strings" 

2003 ) 

2004 elif isinstance(prop, attributes.QueryableAttribute): 

2005 if prop._of_type: 

2006 from_entity = prop._of_type 

2007 mapper_property = prop.property 

2008 if mapper_property is None or not prop_is_relationship( 

2009 mapper_property 

2010 ): 

2011 raise sa_exc.ArgumentError( 

2012 f"Expected relationship property for with_parent(), " 

2013 f"got {mapper_property}" 

2014 ) 

2015 prop_t = mapper_property 

2016 else: 

2017 prop_t = prop 

2018 

2019 return prop_t._with_parent(instance, from_entity=from_entity) 

2020 

2021 

2022def has_identity(object_: object) -> bool: 

2023 """Return True if the given object has a database 

2024 identity. 

2025 

2026 This typically corresponds to the object being 

2027 in either the persistent or detached state. 

2028 

2029 .. seealso:: 

2030 

2031 :func:`.was_deleted` 

2032 

2033 """ 

2034 state = attributes.instance_state(object_) 

2035 return state.has_identity 

2036 

2037 

2038def was_deleted(object_: object) -> bool: 

2039 """Return True if the given object was deleted 

2040 within a session flush. 

2041 

2042 This is regardless of whether or not the object is 

2043 persistent or detached. 

2044 

2045 .. seealso:: 

2046 

2047 :attr:`.InstanceState.was_deleted` 

2048 

2049 """ 

2050 

2051 state = attributes.instance_state(object_) 

2052 return state.was_deleted 

2053 

2054 

2055def _entity_corresponds_to( 

2056 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2057) -> bool: 

2058 """determine if 'given' corresponds to 'entity', in terms 

2059 of an entity passed to Query that would match the same entity 

2060 being referred to elsewhere in the query. 

2061 

2062 """ 

2063 if insp_is_aliased_class(entity): 

2064 if insp_is_aliased_class(given): 

2065 if entity._base_alias() is given._base_alias(): 

2066 return True 

2067 return False 

2068 elif insp_is_aliased_class(given): 

2069 if given._use_mapper_path: 

2070 return entity in given.with_polymorphic_mappers 

2071 else: 

2072 return entity is given 

2073 

2074 assert insp_is_mapper(given) 

2075 return entity.common_parent(given) 

2076 

2077 

2078def _entity_corresponds_to_use_path_impl( 

2079 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2080) -> bool: 

2081 """determine if 'given' corresponds to 'entity', in terms 

2082 of a path of loader options where a mapped attribute is taken to 

2083 be a member of a parent entity. 

2084 

2085 e.g.:: 

2086 

2087 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2088 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2089 

2090 a1 = aliased(A) 

2091 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2092 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2093 

2094 wp = with_polymorphic(A, [A1, A2]) 

2095 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2096 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2097 

2098 """ 

2099 if insp_is_aliased_class(given): 

2100 return ( 

2101 insp_is_aliased_class(entity) 

2102 and not entity._use_mapper_path 

2103 and (given is entity or entity in given._with_polymorphic_entities) 

2104 ) 

2105 elif not insp_is_aliased_class(entity): 

2106 return given.isa(entity.mapper) 

2107 else: 

2108 return ( 

2109 entity._use_mapper_path 

2110 and given in entity.with_polymorphic_mappers 

2111 ) 

2112 

2113 

2114def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2115 """determine if 'given' "is a" mapper, in terms of the given 

2116 would load rows of type 'mapper'. 

2117 

2118 """ 

2119 if given.is_aliased_class: 

2120 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2121 mapper 

2122 ) 

2123 elif given.with_polymorphic_mappers: 

2124 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2125 else: 

2126 return given.isa(mapper) 

2127 

2128 

2129def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2130 """calculate __getitem__ in terms of an iterable query object 

2131 that also has a slice() method. 

2132 

2133 """ 

2134 

2135 def _no_negative_indexes(): 

2136 raise IndexError( 

2137 "negative indexes are not accepted by SQL " 

2138 "index / slice operators" 

2139 ) 

2140 

2141 if isinstance(item, slice): 

2142 start, stop, step = util.decode_slice(item) 

2143 

2144 if ( 

2145 isinstance(stop, int) 

2146 and isinstance(start, int) 

2147 and stop - start <= 0 

2148 ): 

2149 return [] 

2150 

2151 elif (isinstance(start, int) and start < 0) or ( 

2152 isinstance(stop, int) and stop < 0 

2153 ): 

2154 _no_negative_indexes() 

2155 

2156 res = iterable_query.slice(start, stop) 

2157 if step is not None: 

2158 return list(res)[None : None : item.step] 

2159 else: 

2160 return list(res) 

2161 else: 

2162 if item == -1: 

2163 _no_negative_indexes() 

2164 else: 

2165 return list(iterable_query[item : item + 1])[0] 

2166 

2167 

2168def _is_mapped_annotation( 

2169 raw_annotation: _AnnotationScanType, 

2170 cls: Type[Any], 

2171 originating_cls: Type[Any], 

2172) -> bool: 

2173 try: 

2174 annotated = de_stringify_annotation( 

2175 cls, raw_annotation, originating_cls.__module__ 

2176 ) 

2177 except NameError: 

2178 # in most cases, at least within our own tests, we can raise 

2179 # here, which is more accurate as it prevents us from returning 

2180 # false negatives. However, in the real world, try to avoid getting 

2181 # involved with end-user annotations that have nothing to do with us. 

2182 # see issue #8888 where we bypass using this function in the case 

2183 # that we want to detect an unresolvable Mapped[] type. 

2184 return False 

2185 else: 

2186 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2187 

2188 

2189class _CleanupError(Exception): 

2190 pass 

2191 

2192 

2193def _cleanup_mapped_str_annotation( 

2194 annotation: str, originating_module: str 

2195) -> str: 

2196 # fix up an annotation that comes in as the form: 

2197 # 'Mapped[List[Address]]' so that it instead looks like: 

2198 # 'Mapped[List["Address"]]' , which will allow us to get 

2199 # "Address" as a string 

2200 

2201 # additionally, resolve symbols for these names since this is where 

2202 # we'd have to do it 

2203 

2204 inner: Optional[Match[str]] 

2205 

2206 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2207 

2208 if not mm: 

2209 return annotation 

2210 

2211 # ticket #8759. Resolve the Mapped name to a real symbol. 

2212 # originally this just checked the name. 

2213 try: 

2214 obj = eval_name_only(mm.group(1), originating_module) 

2215 except NameError as ne: 

2216 raise _CleanupError( 

2217 f'For annotation "{annotation}", could not resolve ' 

2218 f'container type "{mm.group(1)}". ' 

2219 "Please ensure this type is imported at the module level " 

2220 "outside of TYPE_CHECKING blocks" 

2221 ) from ne 

2222 

2223 if obj is typing.ClassVar: 

2224 real_symbol = "ClassVar" 

2225 else: 

2226 try: 

2227 if issubclass(obj, _MappedAnnotationBase): 

2228 real_symbol = obj.__name__ 

2229 else: 

2230 return annotation 

2231 except TypeError: 

2232 # avoid isinstance(obj, type) check, just catch TypeError 

2233 return annotation 

2234 

2235 # note: if one of the codepaths above didn't define real_symbol and 

2236 # then didn't return, real_symbol raises UnboundLocalError 

2237 # which is actually a NameError, and the calling routines don't 

2238 # notice this since they are catching NameError anyway. Just in case 

2239 # this is being modified in the future, something to be aware of. 

2240 

2241 stack = [] 

2242 inner = mm 

2243 while True: 

2244 stack.append(real_symbol if mm is inner else inner.group(1)) 

2245 g2 = inner.group(2) 

2246 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2247 if inner is None: 

2248 stack.append(g2) 

2249 break 

2250 

2251 # stacks we want to rewrite, that is, quote the last entry which 

2252 # we think is a relationship class name: 

2253 # 

2254 # ['Mapped', 'List', 'Address'] 

2255 # ['Mapped', 'A'] 

2256 # 

2257 # stacks we dont want to rewrite, which are generally MappedColumn 

2258 # use cases: 

2259 # 

2260 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2261 # ['Mapped', 'dict[str, str] | None'] 

2262 

2263 if ( 

2264 # avoid already quoted symbols such as 

2265 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2266 not re.match(r"""^["'].*["']$""", stack[-1]) 

2267 # avoid further generics like Dict[] such as 

2268 # ['Mapped', 'dict[str, str] | None'], 

2269 # ['Mapped', 'list[int] | list[str]'], 

2270 # ['Mapped', 'Union[list[int], list[str]]'], 

2271 and not re.search(r"[\[\]]", stack[-1]) 

2272 ): 

2273 stripchars = "\"' " 

2274 stack[-1] = ", ".join( 

2275 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2276 ) 

2277 

2278 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2279 

2280 return annotation 

2281 

2282 

2283def _extract_mapped_subtype( 

2284 raw_annotation: Optional[_AnnotationScanType], 

2285 cls: type, 

2286 originating_module: str, 

2287 key: str, 

2288 attr_cls: Type[Any], 

2289 required: bool, 

2290 is_dataclass_field: bool, 

2291 expect_mapped: bool = True, 

2292 raiseerr: bool = True, 

2293) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2294 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2295 so, return the ``something`` part. 

2296 

2297 Includes error raise scenarios and other options. 

2298 

2299 """ 

2300 

2301 if raw_annotation is None: 

2302 if required: 

2303 raise orm_exc.MappedAnnotationError( 

2304 f"Python typing annotation is required for attribute " 

2305 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2306 f'"{attr_cls.__name__}" construct are None or not present' 

2307 ) 

2308 return None 

2309 

2310 try: 

2311 # destringify the "outside" of the annotation. note we are not 

2312 # adding include_generic so it will *not* dig into generic contents, 

2313 # which will remain as ForwardRef or plain str under future annotations 

2314 # mode. The full destringify happens later when mapped_column goes 

2315 # to do a full lookup in the registry type_annotations_map. 

2316 annotated = de_stringify_annotation( 

2317 cls, 

2318 raw_annotation, 

2319 originating_module, 

2320 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2321 ) 

2322 except _CleanupError as ce: 

2323 raise orm_exc.MappedAnnotationError( 

2324 f"Could not interpret annotation {raw_annotation}. " 

2325 "Check that it uses names that are correctly imported at the " 

2326 "module level. See chained stack trace for more hints." 

2327 ) from ce 

2328 except NameError as ne: 

2329 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2330 raise orm_exc.MappedAnnotationError( 

2331 f"Could not interpret annotation {raw_annotation}. " 

2332 "Check that it uses names that are correctly imported at the " 

2333 "module level. See chained stack trace for more hints." 

2334 ) from ne 

2335 

2336 annotated = raw_annotation # type: ignore 

2337 

2338 if is_dataclass_field: 

2339 return annotated, None 

2340 else: 

2341 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2342 annotated, _MappedAnnotationBase 

2343 ): 

2344 if expect_mapped: 

2345 if not raiseerr: 

2346 return None 

2347 

2348 origin = getattr(annotated, "__origin__", None) 

2349 if origin is typing.ClassVar: 

2350 return None 

2351 

2352 # check for other kind of ORM descriptor like AssociationProxy, 

2353 # don't raise for that (issue #9957) 

2354 elif isinstance(origin, type) and issubclass( 

2355 origin, ORMDescriptor 

2356 ): 

2357 return None 

2358 

2359 raise orm_exc.MappedAnnotationError( 

2360 f'Type annotation for "{cls.__name__}.{key}" ' 

2361 "can't be correctly interpreted for " 

2362 "Annotated Declarative Table form. ORM annotations " 

2363 "should normally make use of the ``Mapped[]`` generic " 

2364 "type, or other ORM-compatible generic type, as a " 

2365 "container for the actual type, which indicates the " 

2366 "intent that the attribute is mapped. " 

2367 "Class variables that are not intended to be mapped " 

2368 "by the ORM should use ClassVar[]. " 

2369 "To allow Annotated Declarative to disregard legacy " 

2370 "annotations which don't use Mapped[] to pass, set " 

2371 '"__allow_unmapped__ = True" on the class or a ' 

2372 "superclass this class.", 

2373 code="zlpr", 

2374 ) 

2375 

2376 else: 

2377 return annotated, None 

2378 

2379 if len(annotated.__args__) != 1: 

2380 raise orm_exc.MappedAnnotationError( 

2381 "Expected sub-type for Mapped[] annotation" 

2382 ) 

2383 

2384 return ( 

2385 # fix dict/list/set args to be ForwardRef, see #11814 

2386 fixup_container_fwd_refs(annotated.__args__[0]), 

2387 annotated.__origin__, 

2388 ) 

2389 

2390 

2391def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2392 if hasattr(prop, "_mapper_property_name"): 

2393 name = prop._mapper_property_name() 

2394 else: 

2395 name = None 

2396 return util.clsname_as_plain_name(prop, name)