Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

821 statements  

1# orm/util.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import get_origin 

24from typing import Iterable 

25from typing import Iterator 

26from typing import List 

27from typing import Literal 

28from typing import Match 

29from typing import Optional 

30from typing import Protocol 

31from typing import Sequence 

32from typing import Tuple 

33from typing import Type 

34from typing import TYPE_CHECKING 

35from typing import TypeVar 

36from typing import Union 

37import weakref 

38 

39from . import attributes # noqa 

40from . import exc 

41from . import exc as orm_exc 

42from ._typing import _O 

43from ._typing import insp_is_aliased_class 

44from ._typing import insp_is_mapper 

45from ._typing import prop_is_relationship 

46from .base import _class_to_mapper as _class_to_mapper 

47from .base import _MappedAnnotationBase 

48from .base import _never_set as _never_set # noqa: F401 

49from .base import _none_only_set as _none_only_set # noqa: F401 

50from .base import _none_set as _none_set # noqa: F401 

51from .base import attribute_str as attribute_str # noqa: F401 

52from .base import class_mapper as class_mapper 

53from .base import DynamicMapped 

54from .base import InspectionAttr as InspectionAttr 

55from .base import instance_str as instance_str # noqa: F401 

56from .base import Mapped 

57from .base import object_mapper as object_mapper 

58from .base import object_state as object_state # noqa: F401 

59from .base import opt_manager_of_class 

60from .base import ORMDescriptor 

61from .base import state_attribute_str as state_attribute_str # noqa: F401 

62from .base import state_class_str as state_class_str # noqa: F401 

63from .base import state_str as state_str # noqa: F401 

64from .base import WriteOnlyMapped 

65from .interfaces import CriteriaOption 

66from .interfaces import MapperProperty as MapperProperty 

67from .interfaces import ORMColumnsClauseRole 

68from .interfaces import ORMEntityColumnsClauseRole 

69from .interfaces import ORMFromClauseRole 

70from .path_registry import PathRegistry as PathRegistry 

71from .. import event 

72from .. import exc as sa_exc 

73from .. import inspection 

74from .. import sql 

75from .. import util 

76from ..engine.result import result_tuple 

77from ..sql import coercions 

78from ..sql import expression 

79from ..sql import lambdas 

80from ..sql import roles 

81from ..sql import util as sql_util 

82from ..sql import visitors 

83from ..sql._typing import is_selectable 

84from ..sql.annotation import SupportsCloneAnnotations 

85from ..sql.base import ColumnCollection 

86from ..sql.cache_key import HasCacheKey 

87from ..sql.cache_key import MemoizedHasCacheKey 

88from ..sql.elements import ColumnElement 

89from ..sql.elements import KeyedColumnElement 

90from ..sql.selectable import FromClause 

91from ..util.langhelpers import MemoizedSlots 

92from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

93from ..util.typing import eval_name_only as _eval_name_only 

94from ..util.typing import fixup_container_fwd_refs 

95from ..util.typing import is_origin_of_cls 

96from ..util.typing import TupleAny 

97from ..util.typing import Unpack 

98 

99if typing.TYPE_CHECKING: 

100 from ._typing import _EntityType 

101 from ._typing import _IdentityKeyType 

102 from ._typing import _InternalEntityType 

103 from ._typing import _ORMCOLEXPR 

104 from .context import _MapperEntity 

105 from .context import _ORMCompileState 

106 from .mapper import Mapper 

107 from .path_registry import _AbstractEntityRegistry 

108 from .query import Query 

109 from .relationships import RelationshipProperty 

110 from ..engine import Row 

111 from ..engine import RowMapping 

112 from ..sql._typing import _CE 

113 from ..sql._typing import _ColumnExpressionArgument 

114 from ..sql._typing import _EquivalentColumnMap 

115 from ..sql._typing import _FromClauseArgument 

116 from ..sql._typing import _OnClauseArgument 

117 from ..sql._typing import _PropagateAttrsType 

118 from ..sql.annotation import _SA 

119 from ..sql.base import ReadOnlyColumnCollection 

120 from ..sql.elements import BindParameter 

121 from ..sql.selectable import _ColumnsClauseElement 

122 from ..sql.selectable import Select 

123 from ..sql.selectable import Selectable 

124 from ..sql.visitors import anon_map 

125 from ..util.typing import _AnnotationScanType 

126 

127_T = TypeVar("_T", bound=Any) 

128 

129all_cascades = frozenset( 

130 ( 

131 "delete", 

132 "delete-orphan", 

133 "all", 

134 "merge", 

135 "expunge", 

136 "save-update", 

137 "refresh-expire", 

138 "none", 

139 ) 

140) 

141 

142_de_stringify_partial = functools.partial( 

143 functools.partial, 

144 locals_=util.immutabledict( 

145 { 

146 "Mapped": Mapped, 

147 "WriteOnlyMapped": WriteOnlyMapped, 

148 "DynamicMapped": DynamicMapped, 

149 } 

150 ), 

151) 

152 

153# partial is practically useless as we have to write out the whole 

154# function and maintain the signature anyway 

155 

156 

157class _DeStringifyAnnotation(Protocol): 

158 def __call__( 

159 self, 

160 cls: Type[Any], 

161 annotation: _AnnotationScanType, 

162 originating_module: str, 

163 *, 

164 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

165 include_generic: bool = False, 

166 ) -> Type[Any]: ... 

167 

168 

169de_stringify_annotation = cast( 

170 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

171) 

172 

173 

174class _EvalNameOnly(Protocol): 

175 def __call__(self, name: str, module_name: str) -> Any: ... 

176 

177 

178eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

179 

180 

181class CascadeOptions(FrozenSet[str]): 

182 """Keeps track of the options sent to 

183 :paramref:`.relationship.cascade`""" 

184 

185 _add_w_all_cascades = all_cascades.difference( 

186 ["all", "none", "delete-orphan"] 

187 ) 

188 _allowed_cascades = all_cascades 

189 

190 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

191 

192 __slots__ = ( 

193 "save_update", 

194 "delete", 

195 "refresh_expire", 

196 "merge", 

197 "expunge", 

198 "delete_orphan", 

199 ) 

200 

201 save_update: bool 

202 delete: bool 

203 refresh_expire: bool 

204 merge: bool 

205 expunge: bool 

206 delete_orphan: bool 

207 

208 def __new__( 

209 cls, value_list: Optional[Union[Iterable[str], str]] 

210 ) -> CascadeOptions: 

211 if isinstance(value_list, str) or value_list is None: 

212 return cls.from_string(value_list) # type: ignore 

213 values = set(value_list) 

214 if values.difference(cls._allowed_cascades): 

215 raise sa_exc.ArgumentError( 

216 "Invalid cascade option(s): %s" 

217 % ", ".join( 

218 [ 

219 repr(x) 

220 for x in sorted( 

221 values.difference(cls._allowed_cascades) 

222 ) 

223 ] 

224 ) 

225 ) 

226 

227 if "all" in values: 

228 values.update(cls._add_w_all_cascades) 

229 if "none" in values: 

230 values.clear() 

231 values.discard("all") 

232 

233 self = super().__new__(cls, values) 

234 self.save_update = "save-update" in values 

235 self.delete = "delete" in values 

236 self.refresh_expire = "refresh-expire" in values 

237 self.merge = "merge" in values 

238 self.expunge = "expunge" in values 

239 self.delete_orphan = "delete-orphan" in values 

240 

241 if self.delete_orphan and not self.delete: 

242 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

243 return self 

244 

245 def __repr__(self): 

246 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

247 

248 @classmethod 

249 def from_string(cls, arg): 

250 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

251 return cls(values) 

252 

253 

254def _validator_events(desc, key, validator, include_removes, include_backrefs): 

255 """Runs a validation method on an attribute value to be set or 

256 appended. 

257 """ 

258 

259 if not include_backrefs: 

260 

261 def detect_is_backref(state, initiator): 

262 impl = state.manager[key].impl 

263 return initiator.impl is not impl 

264 

265 if include_removes: 

266 

267 def append(state, value, initiator): 

268 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

269 include_backrefs or not detect_is_backref(state, initiator) 

270 ): 

271 return validator(state.obj(), key, value, False) 

272 else: 

273 return value 

274 

275 def bulk_set(state, values, initiator): 

276 if include_backrefs or not detect_is_backref(state, initiator): 

277 obj = state.obj() 

278 values[:] = [ 

279 validator(obj, key, value, False) for value in values 

280 ] 

281 

282 def set_(state, value, oldvalue, initiator): 

283 if include_backrefs or not detect_is_backref(state, initiator): 

284 return validator(state.obj(), key, value, False) 

285 else: 

286 return value 

287 

288 def remove(state, value, initiator): 

289 if include_backrefs or not detect_is_backref(state, initiator): 

290 validator(state.obj(), key, value, True) 

291 

292 else: 

293 

294 def append(state, value, initiator): 

295 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

296 include_backrefs or not detect_is_backref(state, initiator) 

297 ): 

298 return validator(state.obj(), key, value) 

299 else: 

300 return value 

301 

302 def bulk_set(state, values, initiator): 

303 if include_backrefs or not detect_is_backref(state, initiator): 

304 obj = state.obj() 

305 values[:] = [validator(obj, key, value) for value in values] 

306 

307 def set_(state, value, oldvalue, initiator): 

308 if include_backrefs or not detect_is_backref(state, initiator): 

309 return validator(state.obj(), key, value) 

310 else: 

311 return value 

312 

313 event.listen(desc, "append", append, raw=True, retval=True) 

314 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

315 event.listen(desc, "set", set_, raw=True, retval=True) 

316 if include_removes: 

317 event.listen(desc, "remove", remove, raw=True, retval=True) 

318 

319 

320def polymorphic_union( 

321 table_map, typecolname, aliasname="p_union", cast_nulls=True 

322): 

323 """Create a ``UNION`` statement used by a polymorphic mapper. 

324 

325 See :ref:`concrete_inheritance` for an example of how 

326 this is used. 

327 

328 :param table_map: mapping of polymorphic identities to 

329 :class:`_schema.Table` objects. 

330 :param typecolname: string name of a "discriminator" column, which will be 

331 derived from the query, producing the polymorphic identity for 

332 each row. If ``None``, no polymorphic discriminator is generated. 

333 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

334 construct generated. 

335 :param cast_nulls: if True, non-existent columns, which are represented 

336 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

337 that is problematic on some backends such as Oracle - in which case it 

338 can be set to False. 

339 

340 """ 

341 

342 colnames: util.OrderedSet[str] = util.OrderedSet() 

343 colnamemaps = {} 

344 types = {} 

345 for key in table_map: 

346 table = table_map[key] 

347 

348 table = coercions.expect(roles.FromClauseRole, table) 

349 table_map[key] = table 

350 

351 m = {} 

352 for c in table.c: 

353 if c.key == typecolname: 

354 raise sa_exc.InvalidRequestError( 

355 "Polymorphic union can't use '%s' as the discriminator " 

356 "column due to mapped column %r; please apply the " 

357 "'typecolname' " 

358 "argument; this is available on " 

359 "ConcreteBase as '_concrete_discriminator_name'" 

360 % (typecolname, c) 

361 ) 

362 colnames.add(c.key) 

363 m[c.key] = c 

364 types[c.key] = c.type 

365 colnamemaps[table] = m 

366 

367 def col(name, table): 

368 try: 

369 return colnamemaps[table][name] 

370 except KeyError: 

371 if cast_nulls: 

372 return sql.cast(sql.null(), types[name]).label(name) 

373 else: 

374 return sql.type_coerce(sql.null(), types[name]).label(name) 

375 

376 result = [] 

377 for type_, table in table_map.items(): 

378 if typecolname is not None: 

379 result.append( 

380 sql.select( 

381 *( 

382 [col(name, table) for name in colnames] 

383 + [ 

384 sql.literal_column( 

385 sql_util._quote_ddl_expr(type_) 

386 ).label(typecolname) 

387 ] 

388 ) 

389 ).select_from(table) 

390 ) 

391 else: 

392 result.append( 

393 sql.select( 

394 *[col(name, table) for name in colnames] 

395 ).select_from(table) 

396 ) 

397 return sql.union_all(*result).alias(aliasname) 

398 

399 

400def identity_key( 

401 class_: Optional[Type[_T]] = None, 

402 ident: Union[Any, Tuple[Any, ...]] = None, 

403 *, 

404 instance: Optional[_T] = None, 

405 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

406 identity_token: Optional[Any] = None, 

407) -> _IdentityKeyType[_T]: 

408 r"""Generate "identity key" tuples, as are used as keys in the 

409 :attr:`.Session.identity_map` dictionary. 

410 

411 This function has several call styles: 

412 

413 * ``identity_key(class, ident, identity_token=token)`` 

414 

415 This form receives a mapped class and a primary key scalar or 

416 tuple as an argument. 

417 

418 E.g.:: 

419 

420 >>> identity_key(MyClass, (1, 2)) 

421 (<class '__main__.MyClass'>, (1, 2), None) 

422 

423 :param class: mapped class (must be a positional argument) 

424 :param ident: primary key, may be a scalar or tuple argument. 

425 :param identity_token: optional identity token 

426 

427 * ``identity_key(instance=instance)`` 

428 

429 This form will produce the identity key for a given instance. The 

430 instance need not be persistent, only that its primary key attributes 

431 are populated (else the key will contain ``None`` for those missing 

432 values). 

433 

434 E.g.:: 

435 

436 >>> instance = MyClass(1, 2) 

437 >>> identity_key(instance=instance) 

438 (<class '__main__.MyClass'>, (1, 2), None) 

439 

440 In this form, the given instance is ultimately run though 

441 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

442 effect of performing a database check for the corresponding row 

443 if the object is expired. 

444 

445 :param instance: object instance (must be given as a keyword arg) 

446 

447 * ``identity_key(class, row=row, identity_token=token)`` 

448 

449 This form is similar to the class/tuple form, except is passed a 

450 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

451 

452 E.g.:: 

453 

454 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

455 >>> identity_key(MyClass, row=row) 

456 (<class '__main__.MyClass'>, (1, 2), None) 

457 

458 :param class: mapped class (must be a positional argument) 

459 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

460 (must be given as a keyword arg) 

461 :param identity_token: optional identity token 

462 

463 """ # noqa: E501 

464 if class_ is not None: 

465 mapper = class_mapper(class_) 

466 if row is None: 

467 if ident is None: 

468 raise sa_exc.ArgumentError("ident or row is required") 

469 return mapper.identity_key_from_primary_key( 

470 tuple(util.to_list(ident)), identity_token=identity_token 

471 ) 

472 else: 

473 return mapper.identity_key_from_row( 

474 row, identity_token=identity_token 

475 ) 

476 elif instance is not None: 

477 mapper = object_mapper(instance) 

478 return mapper.identity_key_from_instance(instance) 

479 else: 

480 raise sa_exc.ArgumentError("class or instance is required") 

481 

482 

483class _TraceAdaptRole(enum.Enum): 

484 """Enumeration of all the use cases for ORMAdapter. 

485 

486 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

487 used for in-place adaption of column expressions to be applied to a SELECT, 

488 replacing :class:`.Table` and other objects that are mapped to classes with 

489 aliases of those tables in the case of joined eager loading, or in the case 

490 of polymorphic loading as used with concrete mappings or other custom "with 

491 polymorphic" parameters, with whole user-defined subqueries. The 

492 enumerations provide an overview of all the use cases used by ORMAdapter, a 

493 layer of formality as to the introduction of new ORMAdapter use cases (of 

494 which none are anticipated), as well as a means to trace the origins of a 

495 particular ORMAdapter within runtime debugging. 

496 

497 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

498 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

499 method and the ``Query.select_from_entity()`` methods, favoring 

500 user-explicit aliasing schemes using the ``aliased()`` and 

501 ``with_polymorphic()`` standalone constructs; these still use adaption, 

502 however the adaption is applied in a narrower scope. 

503 

504 """ 

505 

506 # aliased() use that is used to adapt individual attributes at query 

507 # construction time 

508 ALIASED_INSP = enum.auto() 

509 

510 # joinedload cases; typically adapt an ON clause of a relationship 

511 # join 

512 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

513 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

514 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

515 

516 # polymorphic cases - these are complex ones that replace FROM 

517 # clauses, replacing tables with subqueries 

518 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

519 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

520 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

521 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

522 

523 # the from_statement() case, used only to adapt individual attributes 

524 # from a given statement to local ORM attributes at result fetching 

525 # time. assigned to ORMCompileState._from_obj_alias 

526 ADAPT_FROM_STATEMENT = enum.auto() 

527 

528 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

529 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

530 # joinedloads are then placed on the outside. 

531 # assigned to ORMCompileState.compound_eager_adapter 

532 COMPOUND_EAGER_STATEMENT = enum.auto() 

533 

534 # the legacy Query._set_select_from() case. 

535 # this is needed for Query's set operations (i.e. UNION, etc. ) 

536 # as well as "legacy from_self()", which while removed from 2.0 as 

537 # public API, is used for the Query.count() method. this one 

538 # still does full statement traversal 

539 # assigned to ORMCompileState._from_obj_alias 

540 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

541 

542 

543class ORMStatementAdapter(sql_util.ColumnAdapter): 

544 """ColumnAdapter which includes a role attribute.""" 

545 

546 __slots__ = ("role",) 

547 

548 def __init__( 

549 self, 

550 role: _TraceAdaptRole, 

551 selectable: Selectable, 

552 *, 

553 equivalents: Optional[_EquivalentColumnMap] = None, 

554 adapt_required: bool = False, 

555 allow_label_resolve: bool = True, 

556 anonymize_labels: bool = False, 

557 adapt_on_names: bool = False, 

558 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

559 ): 

560 self.role = role 

561 super().__init__( 

562 selectable, 

563 equivalents=equivalents, 

564 adapt_required=adapt_required, 

565 allow_label_resolve=allow_label_resolve, 

566 anonymize_labels=anonymize_labels, 

567 adapt_on_names=adapt_on_names, 

568 adapt_from_selectables=adapt_from_selectables, 

569 ) 

570 

571 

572class ORMAdapter(sql_util.ColumnAdapter): 

573 """ColumnAdapter subclass which excludes adaptation of entities from 

574 non-matching mappers. 

575 

576 """ 

577 

578 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

579 

580 is_aliased_class: bool 

581 aliased_insp: Optional[AliasedInsp[Any]] 

582 

583 def __init__( 

584 self, 

585 role: _TraceAdaptRole, 

586 entity: _InternalEntityType[Any], 

587 *, 

588 equivalents: Optional[_EquivalentColumnMap] = None, 

589 adapt_required: bool = False, 

590 allow_label_resolve: bool = True, 

591 anonymize_labels: bool = False, 

592 selectable: Optional[Selectable] = None, 

593 limit_on_entity: bool = True, 

594 adapt_on_names: bool = False, 

595 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

596 ): 

597 self.role = role 

598 self.mapper = entity.mapper 

599 if selectable is None: 

600 selectable = entity.selectable 

601 if insp_is_aliased_class(entity): 

602 self.is_aliased_class = True 

603 self.aliased_insp = entity 

604 else: 

605 self.is_aliased_class = False 

606 self.aliased_insp = None 

607 

608 super().__init__( 

609 selectable, 

610 equivalents, 

611 adapt_required=adapt_required, 

612 allow_label_resolve=allow_label_resolve, 

613 anonymize_labels=anonymize_labels, 

614 include_fn=self._include_fn if limit_on_entity else None, 

615 adapt_on_names=adapt_on_names, 

616 adapt_from_selectables=adapt_from_selectables, 

617 ) 

618 

619 def _include_fn(self, elem): 

620 entity = elem._annotations.get("parentmapper", None) 

621 

622 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

623 

624 

625class AliasedClass( 

626 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

627): 

628 r"""Represents an "aliased" form of a mapped class for usage with Query. 

629 

630 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

631 construct, this object mimics the mapped class using a 

632 ``__getattr__`` scheme and maintains a reference to a 

633 real :class:`~sqlalchemy.sql.expression.Alias` object. 

634 

635 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

636 within a SQL statement generated by the ORM, such that an existing 

637 mapped entity can be used in multiple contexts. A simple example:: 

638 

639 # find all pairs of users with the same name 

640 user_alias = aliased(User) 

641 session.query(User, user_alias).join( 

642 (user_alias, User.id > user_alias.id) 

643 ).filter(User.name == user_alias.name) 

644 

645 :class:`.AliasedClass` is also capable of mapping an existing mapped 

646 class to an entirely new selectable, provided this selectable is column- 

647 compatible with the existing mapped selectable, and it can also be 

648 configured in a mapping as the target of a :func:`_orm.relationship`. 

649 See the links below for examples. 

650 

651 The :class:`.AliasedClass` object is constructed typically using the 

652 :func:`_orm.aliased` function. It also is produced with additional 

653 configuration when using the :func:`_orm.with_polymorphic` function. 

654 

655 The resulting object is an instance of :class:`.AliasedClass`. 

656 This object implements an attribute scheme which produces the 

657 same attribute and method interface as the original mapped 

658 class, allowing :class:`.AliasedClass` to be compatible 

659 with any attribute technique which works on the original class, 

660 including hybrid attributes (see :ref:`hybrids_toplevel`). 

661 

662 The :class:`.AliasedClass` can be inspected for its underlying 

663 :class:`_orm.Mapper`, aliased selectable, and other information 

664 using :func:`_sa.inspect`:: 

665 

666 from sqlalchemy import inspect 

667 

668 my_alias = aliased(MyClass) 

669 insp = inspect(my_alias) 

670 

671 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

672 

673 

674 .. seealso:: 

675 

676 :func:`.aliased` 

677 

678 :func:`.with_polymorphic` 

679 

680 :ref:`relationship_aliased_class` 

681 

682 :ref:`relationship_to_window_function` 

683 

684 

685 """ 

686 

687 __name__: str 

688 

689 def __init__( 

690 self, 

691 mapped_class_or_ac: _EntityType[_O], 

692 alias: Optional[FromClause] = None, 

693 name: Optional[str] = None, 

694 flat: bool = False, 

695 adapt_on_names: bool = False, 

696 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

697 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

698 base_alias: Optional[AliasedInsp[Any]] = None, 

699 use_mapper_path: bool = False, 

700 represents_outer_join: bool = False, 

701 ): 

702 insp = cast( 

703 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

704 ) 

705 mapper = insp.mapper 

706 

707 nest_adapters = False 

708 

709 if alias is None: 

710 if insp.is_aliased_class and insp.selectable._is_subquery: 

711 alias = insp.selectable.alias() 

712 else: 

713 alias = ( 

714 mapper._with_polymorphic_selectable._anonymous_fromclause( 

715 name=name, 

716 flat=flat, 

717 ) 

718 ) 

719 elif insp.is_aliased_class: 

720 nest_adapters = True 

721 

722 assert alias is not None 

723 self._aliased_insp = AliasedInsp( 

724 self, 

725 insp, 

726 alias, 

727 name, 

728 ( 

729 with_polymorphic_mappers 

730 if with_polymorphic_mappers 

731 else mapper.with_polymorphic_mappers 

732 ), 

733 ( 

734 with_polymorphic_discriminator 

735 if with_polymorphic_discriminator is not None 

736 else mapper.polymorphic_on 

737 ), 

738 base_alias, 

739 use_mapper_path, 

740 adapt_on_names, 

741 represents_outer_join, 

742 nest_adapters, 

743 ) 

744 

745 self.__name__ = f"aliased({mapper.class_.__name__})" 

746 

747 @classmethod 

748 def _reconstitute_from_aliased_insp( 

749 cls, aliased_insp: AliasedInsp[_O] 

750 ) -> AliasedClass[_O]: 

751 obj = cls.__new__(cls) 

752 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

753 obj._aliased_insp = aliased_insp 

754 

755 if aliased_insp._is_with_polymorphic: 

756 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

757 if sub_aliased_insp is not aliased_insp: 

758 ent = AliasedClass._reconstitute_from_aliased_insp( 

759 sub_aliased_insp 

760 ) 

761 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

762 

763 return obj 

764 

765 def __getattr__(self, key: str) -> Any: 

766 try: 

767 _aliased_insp = self.__dict__["_aliased_insp"] 

768 except KeyError: 

769 raise AttributeError() 

770 else: 

771 target = _aliased_insp._target 

772 # maintain all getattr mechanics 

773 attr = getattr(target, key) 

774 

775 # attribute is a method, that will be invoked against a 

776 # "self"; so just return a new method with the same function and 

777 # new self 

778 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

779 return types.MethodType(attr.__func__, self) 

780 

781 # attribute is a descriptor, that will be invoked against a 

782 # "self"; so invoke the descriptor against this self 

783 if hasattr(attr, "__get__"): 

784 attr = attr.__get__(None, self) 

785 

786 # attributes within the QueryableAttribute system will want this 

787 # to be invoked so the object can be adapted 

788 if hasattr(attr, "adapt_to_entity"): 

789 attr = attr.adapt_to_entity(_aliased_insp) 

790 setattr(self, key, attr) 

791 

792 return attr 

793 

794 def _get_from_serialized( 

795 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

796 ) -> Any: 

797 # this method is only used in terms of the 

798 # sqlalchemy.ext.serializer extension 

799 attr = getattr(mapped_class, key) 

800 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

801 return types.MethodType(attr.__func__, self) 

802 

803 # attribute is a descriptor, that will be invoked against a 

804 # "self"; so invoke the descriptor against this self 

805 if hasattr(attr, "__get__"): 

806 attr = attr.__get__(None, self) 

807 

808 # attributes within the QueryableAttribute system will want this 

809 # to be invoked so the object can be adapted 

810 if hasattr(attr, "adapt_to_entity"): 

811 aliased_insp._weak_entity = weakref.ref(self) 

812 attr = attr.adapt_to_entity(aliased_insp) 

813 setattr(self, key, attr) 

814 

815 return attr 

816 

817 def __repr__(self) -> str: 

818 return "<AliasedClass at 0x%x; %s>" % ( 

819 id(self), 

820 self._aliased_insp._target.__name__, 

821 ) 

822 

823 def __str__(self) -> str: 

824 return str(self._aliased_insp) 

825 

826 

827@inspection._self_inspects 

828class AliasedInsp( 

829 ORMEntityColumnsClauseRole[_O], 

830 ORMFromClauseRole, 

831 HasCacheKey, 

832 InspectionAttr, 

833 MemoizedSlots, 

834 inspection.Inspectable["AliasedInsp[_O]"], 

835 Generic[_O], 

836): 

837 """Provide an inspection interface for an 

838 :class:`.AliasedClass` object. 

839 

840 The :class:`.AliasedInsp` object is returned 

841 given an :class:`.AliasedClass` using the 

842 :func:`_sa.inspect` function:: 

843 

844 from sqlalchemy import inspect 

845 from sqlalchemy.orm import aliased 

846 

847 my_alias = aliased(MyMappedClass) 

848 insp = inspect(my_alias) 

849 

850 Attributes on :class:`.AliasedInsp` 

851 include: 

852 

853 * ``entity`` - the :class:`.AliasedClass` represented. 

854 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

855 * ``selectable`` - the :class:`_expression.Alias` 

856 construct which ultimately 

857 represents an aliased :class:`_schema.Table` or 

858 :class:`_expression.Select` 

859 construct. 

860 * ``name`` - the name of the alias. Also is used as the attribute 

861 name when returned in a result tuple from :class:`_query.Query`. 

862 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

863 objects 

864 indicating all those mappers expressed in the select construct 

865 for the :class:`.AliasedClass`. 

866 * ``polymorphic_on`` - an alternate column or SQL expression which 

867 will be used as the "discriminator" for a polymorphic load. 

868 

869 .. seealso:: 

870 

871 :ref:`inspection_toplevel` 

872 

873 """ 

874 

875 __slots__ = ( 

876 "__weakref__", 

877 "_weak_entity", 

878 "mapper", 

879 "selectable", 

880 "name", 

881 "_adapt_on_names", 

882 "with_polymorphic_mappers", 

883 "polymorphic_on", 

884 "_use_mapper_path", 

885 "_base_alias", 

886 "represents_outer_join", 

887 "persist_selectable", 

888 "local_table", 

889 "_is_with_polymorphic", 

890 "_with_polymorphic_entities", 

891 "_adapter", 

892 "_target", 

893 "__clause_element__", 

894 "_memoized_values", 

895 "_all_column_expressions", 

896 "_nest_adapters", 

897 ) 

898 

899 _cache_key_traversal = [ 

900 ("name", visitors.ExtendedInternalTraversal.dp_string), 

901 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

902 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

903 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

904 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

905 ( 

906 "with_polymorphic_mappers", 

907 visitors.InternalTraversal.dp_has_cache_key_list, 

908 ), 

909 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

910 ] 

911 

912 mapper: Mapper[_O] 

913 selectable: FromClause 

914 _adapter: ORMAdapter 

915 with_polymorphic_mappers: Sequence[Mapper[Any]] 

916 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

917 

918 _weak_entity: weakref.ref[AliasedClass[_O]] 

919 """the AliasedClass that refers to this AliasedInsp""" 

920 

921 _target: Union[Type[_O], AliasedClass[_O]] 

922 """the thing referenced by the AliasedClass/AliasedInsp. 

923 

924 In the vast majority of cases, this is the mapped class. However 

925 it may also be another AliasedClass (alias of alias). 

926 

927 """ 

928 

929 def __init__( 

930 self, 

931 entity: AliasedClass[_O], 

932 inspected: _InternalEntityType[_O], 

933 selectable: FromClause, 

934 name: Optional[str], 

935 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

936 polymorphic_on: Optional[ColumnElement[Any]], 

937 _base_alias: Optional[AliasedInsp[Any]], 

938 _use_mapper_path: bool, 

939 adapt_on_names: bool, 

940 represents_outer_join: bool, 

941 nest_adapters: bool, 

942 ): 

943 mapped_class_or_ac = inspected.entity 

944 mapper = inspected.mapper 

945 

946 self._weak_entity = weakref.ref(entity) 

947 self.mapper = mapper 

948 self.selectable = self.persist_selectable = self.local_table = ( 

949 selectable 

950 ) 

951 self.name = name 

952 self.polymorphic_on = polymorphic_on 

953 self._base_alias = weakref.ref(_base_alias or self) 

954 self._use_mapper_path = _use_mapper_path 

955 self.represents_outer_join = represents_outer_join 

956 self._nest_adapters = nest_adapters 

957 

958 if with_polymorphic_mappers: 

959 self._is_with_polymorphic = True 

960 self.with_polymorphic_mappers = with_polymorphic_mappers 

961 self._with_polymorphic_entities = [] 

962 for poly in self.with_polymorphic_mappers: 

963 if poly is not mapper: 

964 ent = AliasedClass( 

965 poly.class_, 

966 selectable, 

967 base_alias=self, 

968 adapt_on_names=adapt_on_names, 

969 use_mapper_path=_use_mapper_path, 

970 ) 

971 

972 setattr(self.entity, poly.class_.__name__, ent) 

973 self._with_polymorphic_entities.append(ent._aliased_insp) 

974 

975 else: 

976 self._is_with_polymorphic = False 

977 self.with_polymorphic_mappers = [mapper] 

978 

979 self._adapter = ORMAdapter( 

980 _TraceAdaptRole.ALIASED_INSP, 

981 mapper, 

982 selectable=selectable, 

983 equivalents=mapper._equivalent_columns, 

984 adapt_on_names=adapt_on_names, 

985 anonymize_labels=True, 

986 # make sure the adapter doesn't try to grab other tables that 

987 # are not even the thing we are mapping, such as embedded 

988 # selectables in subqueries or CTEs. See issue #6060 

989 adapt_from_selectables={ 

990 m.selectable 

991 for m in self.with_polymorphic_mappers 

992 if not adapt_on_names 

993 }, 

994 limit_on_entity=False, 

995 ) 

996 

997 if nest_adapters: 

998 # supports "aliased class of aliased class" use case 

999 assert isinstance(inspected, AliasedInsp) 

1000 self._adapter = inspected._adapter.wrap(self._adapter) 

1001 

1002 self._adapt_on_names = adapt_on_names 

1003 self._target = mapped_class_or_ac 

1004 

1005 @classmethod 

1006 def _alias_factory( 

1007 cls, 

1008 element: Union[_EntityType[_O], FromClause], 

1009 alias: Optional[FromClause] = None, 

1010 name: Optional[str] = None, 

1011 flat: bool = False, 

1012 adapt_on_names: bool = False, 

1013 ) -> Union[AliasedClass[_O], FromClause]: 

1014 if isinstance(element, FromClause): 

1015 if adapt_on_names: 

1016 raise sa_exc.ArgumentError( 

1017 "adapt_on_names only applies to ORM elements" 

1018 ) 

1019 if name: 

1020 return element.alias(name=name, flat=flat) 

1021 else: 

1022 return coercions.expect( 

1023 roles.AnonymizedFromClauseRole, element, flat=flat 

1024 ) 

1025 else: 

1026 return AliasedClass( 

1027 element, 

1028 alias=alias, 

1029 flat=flat, 

1030 name=name, 

1031 adapt_on_names=adapt_on_names, 

1032 ) 

1033 

1034 @classmethod 

1035 def _with_polymorphic_factory( 

1036 cls, 

1037 base: Union[Type[_O], Mapper[_O]], 

1038 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1039 selectable: Union[Literal[False, None], FromClause] = False, 

1040 flat: bool = False, 

1041 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1042 aliased: bool = False, 

1043 innerjoin: bool = False, 

1044 adapt_on_names: bool = False, 

1045 name: Optional[str] = None, 

1046 _use_mapper_path: bool = False, 

1047 ) -> AliasedClass[_O]: 

1048 primary_mapper = _class_to_mapper(base) 

1049 

1050 if selectable not in (None, False) and flat: 

1051 raise sa_exc.ArgumentError( 

1052 "the 'flat' and 'selectable' arguments cannot be passed " 

1053 "simultaneously to with_polymorphic()" 

1054 ) 

1055 

1056 mappers, selectable = primary_mapper._with_polymorphic_args( 

1057 classes, selectable, innerjoin=innerjoin 

1058 ) 

1059 if aliased or flat: 

1060 assert selectable is not None 

1061 selectable = selectable._anonymous_fromclause(flat=flat) 

1062 

1063 return AliasedClass( 

1064 base, 

1065 selectable, 

1066 name=name, 

1067 with_polymorphic_mappers=mappers, 

1068 adapt_on_names=adapt_on_names, 

1069 with_polymorphic_discriminator=polymorphic_on, 

1070 use_mapper_path=_use_mapper_path, 

1071 represents_outer_join=not innerjoin, 

1072 ) 

1073 

1074 @property 

1075 def entity(self) -> AliasedClass[_O]: 

1076 # to eliminate reference cycles, the AliasedClass is held weakly. 

1077 # this produces some situations where the AliasedClass gets lost, 

1078 # particularly when one is created internally and only the AliasedInsp 

1079 # is passed around. 

1080 # to work around this case, we just generate a new one when we need 

1081 # it, as it is a simple class with very little initial state on it. 

1082 ent = self._weak_entity() 

1083 if ent is None: 

1084 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1085 self._weak_entity = weakref.ref(ent) 

1086 return ent 

1087 

1088 is_aliased_class = True 

1089 "always returns True" 

1090 

1091 def _memoized_method___clause_element__(self) -> FromClause: 

1092 return self.selectable._annotate( 

1093 { 

1094 "parentmapper": self.mapper, 

1095 "parententity": self, 

1096 "entity_namespace": self, 

1097 } 

1098 )._set_propagate_attrs( 

1099 {"compile_state_plugin": "orm", "plugin_subject": self} 

1100 ) 

1101 

1102 @property 

1103 def entity_namespace(self) -> AliasedClass[_O]: 

1104 return self.entity 

1105 

1106 @property 

1107 def class_(self) -> Type[_O]: 

1108 """Return the mapped class ultimately represented by this 

1109 :class:`.AliasedInsp`.""" 

1110 return self.mapper.class_ 

1111 

1112 @property 

1113 def _path_registry(self) -> _AbstractEntityRegistry: 

1114 if self._use_mapper_path: 

1115 return self.mapper._path_registry 

1116 else: 

1117 return PathRegistry.per_mapper(self) 

1118 

1119 def __getstate__(self) -> Dict[str, Any]: 

1120 return { 

1121 "entity": self.entity, 

1122 "mapper": self.mapper, 

1123 "alias": self.selectable, 

1124 "name": self.name, 

1125 "adapt_on_names": self._adapt_on_names, 

1126 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1127 "with_polymorphic_discriminator": self.polymorphic_on, 

1128 "base_alias": self._base_alias(), 

1129 "use_mapper_path": self._use_mapper_path, 

1130 "represents_outer_join": self.represents_outer_join, 

1131 "nest_adapters": self._nest_adapters, 

1132 } 

1133 

1134 def __setstate__(self, state: Dict[str, Any]) -> None: 

1135 self.__init__( # type: ignore 

1136 state["entity"], 

1137 state["mapper"], 

1138 state["alias"], 

1139 state["name"], 

1140 state["with_polymorphic_mappers"], 

1141 state["with_polymorphic_discriminator"], 

1142 state["base_alias"], 

1143 state["use_mapper_path"], 

1144 state["adapt_on_names"], 

1145 state["represents_outer_join"], 

1146 state["nest_adapters"], 

1147 ) 

1148 

1149 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1150 # assert self._is_with_polymorphic 

1151 # assert other._is_with_polymorphic 

1152 

1153 primary_mapper = other.mapper 

1154 

1155 assert self.mapper is primary_mapper 

1156 

1157 our_classes = util.to_set( 

1158 mp.class_ for mp in self.with_polymorphic_mappers 

1159 ) 

1160 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1161 if our_classes == new_classes: 

1162 return other 

1163 else: 

1164 classes = our_classes.union(new_classes) 

1165 

1166 mappers, selectable = primary_mapper._with_polymorphic_args( 

1167 classes, None, innerjoin=not other.represents_outer_join 

1168 ) 

1169 selectable = selectable._anonymous_fromclause(flat=True) 

1170 return AliasedClass( 

1171 primary_mapper, 

1172 selectable, 

1173 with_polymorphic_mappers=mappers, 

1174 with_polymorphic_discriminator=other.polymorphic_on, 

1175 use_mapper_path=other._use_mapper_path, 

1176 represents_outer_join=other.represents_outer_join, 

1177 )._aliased_insp 

1178 

1179 def _adapt_element( 

1180 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1181 ) -> _ORMCOLEXPR: 

1182 assert isinstance(expr, ColumnElement) 

1183 d: Dict[str, Any] = { 

1184 "parententity": self, 

1185 "parentmapper": self.mapper, 

1186 } 

1187 if key: 

1188 d["proxy_key"] = key 

1189 

1190 # userspace adapt of an attribute from AliasedClass; validate that 

1191 # it actually was present 

1192 adapted = self._adapter.adapt_check_present(expr) 

1193 if adapted is None: 

1194 adapted = expr 

1195 if self._adapter.adapt_on_names: 

1196 util.warn_limited( 

1197 "Did not locate an expression in selectable for " 

1198 "attribute %r; ensure name is correct in expression", 

1199 (key,), 

1200 ) 

1201 else: 

1202 util.warn_limited( 

1203 "Did not locate an expression in selectable for " 

1204 "attribute %r; to match by name, use the " 

1205 "adapt_on_names parameter", 

1206 (key,), 

1207 ) 

1208 

1209 return adapted._annotate(d)._set_propagate_attrs( 

1210 {"compile_state_plugin": "orm", "plugin_subject": self} 

1211 ) 

1212 

1213 if TYPE_CHECKING: 

1214 # establish compatibility with the _ORMAdapterProto protocol, 

1215 # which in turn is compatible with _CoreAdapterProto. 

1216 

1217 def _orm_adapt_element( 

1218 self, 

1219 obj: _CE, 

1220 key: Optional[str] = None, 

1221 ) -> _CE: ... 

1222 

1223 else: 

1224 _orm_adapt_element = _adapt_element 

1225 

1226 def _entity_for_mapper(self, mapper): 

1227 self_poly = self.with_polymorphic_mappers 

1228 if mapper in self_poly: 

1229 if mapper is self.mapper: 

1230 return self 

1231 else: 

1232 return getattr( 

1233 self.entity, mapper.class_.__name__ 

1234 )._aliased_insp 

1235 elif mapper.isa(self.mapper): 

1236 return self 

1237 else: 

1238 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1239 

1240 def _memoized_attr__get_clause(self): 

1241 onclause, replacemap = self.mapper._get_clause 

1242 return ( 

1243 self._adapter.traverse(onclause), 

1244 { 

1245 self._adapter.traverse(col): param 

1246 for col, param in replacemap.items() 

1247 }, 

1248 ) 

1249 

1250 def _memoized_attr__memoized_values(self): 

1251 return {} 

1252 

1253 def _memoized_attr__all_column_expressions(self): 

1254 if self._is_with_polymorphic: 

1255 cols_plus_keys = self.mapper._columns_plus_keys( 

1256 [ent.mapper for ent in self._with_polymorphic_entities] 

1257 ) 

1258 else: 

1259 cols_plus_keys = self.mapper._columns_plus_keys() 

1260 

1261 cols_plus_keys = [ 

1262 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1263 ] 

1264 

1265 return ColumnCollection(cols_plus_keys) 

1266 

1267 def _memo(self, key, callable_, *args, **kw): 

1268 if key in self._memoized_values: 

1269 return self._memoized_values[key] 

1270 else: 

1271 self._memoized_values[key] = value = callable_(*args, **kw) 

1272 return value 

1273 

1274 def __repr__(self): 

1275 if self.with_polymorphic_mappers: 

1276 with_poly = "(%s)" % ", ".join( 

1277 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1278 ) 

1279 else: 

1280 with_poly = "" 

1281 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1282 id(self), 

1283 self.class_.__name__, 

1284 with_poly, 

1285 ) 

1286 

1287 def __str__(self): 

1288 if self._is_with_polymorphic: 

1289 return "with_polymorphic(%s, [%s])" % ( 

1290 self._target.__name__, 

1291 ", ".join( 

1292 mp.class_.__name__ 

1293 for mp in self.with_polymorphic_mappers 

1294 if mp is not self.mapper 

1295 ), 

1296 ) 

1297 else: 

1298 return "aliased(%s)" % (self._target.__name__,) 

1299 

1300 

1301class _WrapUserEntity: 

1302 """A wrapper used within the loader_criteria lambda caller so that 

1303 we can bypass declared_attr descriptors on unmapped mixins, which 

1304 normally emit a warning for such use. 

1305 

1306 might also be useful for other per-lambda instrumentations should 

1307 the need arise. 

1308 

1309 """ 

1310 

1311 __slots__ = ("subject",) 

1312 

1313 def __init__(self, subject): 

1314 self.subject = subject 

1315 

1316 @util.preload_module("sqlalchemy.orm.decl_api") 

1317 def __getattribute__(self, name): 

1318 decl_api = util.preloaded.orm.decl_api 

1319 

1320 subject = object.__getattribute__(self, "subject") 

1321 if name in subject.__dict__ and isinstance( 

1322 subject.__dict__[name], decl_api.declared_attr 

1323 ): 

1324 return subject.__dict__[name].fget(subject) 

1325 else: 

1326 return getattr(subject, name) 

1327 

1328 

1329class LoaderCriteriaOption(CriteriaOption): 

1330 """Add additional WHERE criteria to the load for all occurrences of 

1331 a particular entity. 

1332 

1333 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1334 :func:`_orm.with_loader_criteria` function; see that function for 

1335 details. 

1336 

1337 .. versionadded:: 1.4 

1338 

1339 """ 

1340 

1341 __slots__ = ( 

1342 "root_entity", 

1343 "entity", 

1344 "deferred_where_criteria", 

1345 "where_criteria", 

1346 "_where_crit_orig", 

1347 "include_aliases", 

1348 "propagate_to_loaders", 

1349 ) 

1350 

1351 _traverse_internals = [ 

1352 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1353 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1354 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1355 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1356 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1357 ] 

1358 

1359 root_entity: Optional[Type[Any]] 

1360 entity: Optional[_InternalEntityType[Any]] 

1361 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1362 deferred_where_criteria: bool 

1363 include_aliases: bool 

1364 propagate_to_loaders: bool 

1365 

1366 _where_crit_orig: Any 

1367 

1368 def __init__( 

1369 self, 

1370 entity_or_base: _EntityType[Any], 

1371 where_criteria: Union[ 

1372 _ColumnExpressionArgument[bool], 

1373 Callable[[Any], _ColumnExpressionArgument[bool]], 

1374 ], 

1375 loader_only: bool = False, 

1376 include_aliases: bool = False, 

1377 propagate_to_loaders: bool = True, 

1378 track_closure_variables: bool = True, 

1379 ): 

1380 entity = cast( 

1381 "_InternalEntityType[Any]", 

1382 inspection.inspect(entity_or_base, False), 

1383 ) 

1384 if entity is None: 

1385 self.root_entity = cast("Type[Any]", entity_or_base) 

1386 self.entity = None 

1387 else: 

1388 self.root_entity = None 

1389 self.entity = entity 

1390 

1391 self._where_crit_orig = where_criteria 

1392 if callable(where_criteria): 

1393 if self.root_entity is not None: 

1394 wrap_entity = self.root_entity 

1395 else: 

1396 assert entity is not None 

1397 wrap_entity = entity.entity 

1398 

1399 self.deferred_where_criteria = True 

1400 self.where_criteria = lambdas.DeferredLambdaElement( 

1401 where_criteria, 

1402 roles.WhereHavingRole, 

1403 lambda_args=(_WrapUserEntity(wrap_entity),), 

1404 opts=lambdas.LambdaOptions( 

1405 track_closure_variables=track_closure_variables 

1406 ), 

1407 ) 

1408 else: 

1409 self.deferred_where_criteria = False 

1410 self.where_criteria = coercions.expect( 

1411 roles.WhereHavingRole, where_criteria 

1412 ) 

1413 

1414 self.include_aliases = include_aliases 

1415 self.propagate_to_loaders = propagate_to_loaders 

1416 

1417 @classmethod 

1418 def _unreduce( 

1419 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1420 ): 

1421 return LoaderCriteriaOption( 

1422 entity, 

1423 where_criteria, 

1424 include_aliases=include_aliases, 

1425 propagate_to_loaders=propagate_to_loaders, 

1426 ) 

1427 

1428 def __reduce__(self): 

1429 return ( 

1430 LoaderCriteriaOption._unreduce, 

1431 ( 

1432 self.entity.class_ if self.entity else self.root_entity, 

1433 self._where_crit_orig, 

1434 self.include_aliases, 

1435 self.propagate_to_loaders, 

1436 ), 

1437 ) 

1438 

1439 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1440 if self.entity: 

1441 yield from self.entity.mapper.self_and_descendants 

1442 else: 

1443 assert self.root_entity 

1444 stack = list(self.root_entity.__subclasses__()) 

1445 while stack: 

1446 subclass = stack.pop(0) 

1447 ent = cast( 

1448 "_InternalEntityType[Any]", 

1449 inspection.inspect(subclass, raiseerr=False), 

1450 ) 

1451 if ent: 

1452 yield from ent.mapper.self_and_descendants 

1453 else: 

1454 stack.extend(subclass.__subclasses__()) 

1455 

1456 def _should_include(self, compile_state: _ORMCompileState) -> bool: 

1457 if ( 

1458 compile_state.select_statement._annotations.get( 

1459 "for_loader_criteria", None 

1460 ) 

1461 is self 

1462 ): 

1463 return False 

1464 return True 

1465 

1466 def _resolve_where_criteria( 

1467 self, ext_info: _InternalEntityType[Any] 

1468 ) -> ColumnElement[bool]: 

1469 if self.deferred_where_criteria: 

1470 crit = cast( 

1471 "ColumnElement[bool]", 

1472 self.where_criteria._resolve_with_args(ext_info.entity), 

1473 ) 

1474 else: 

1475 crit = self.where_criteria # type: ignore 

1476 assert isinstance(crit, ColumnElement) 

1477 return sql_util._deep_annotate( 

1478 crit, 

1479 {"for_loader_criteria": self}, 

1480 detect_subquery_cols=True, 

1481 ind_cols_on_fromclause=True, 

1482 ) 

1483 

1484 def process_compile_state_replaced_entities( 

1485 self, 

1486 compile_state: _ORMCompileState, 

1487 mapper_entities: Iterable[_MapperEntity], 

1488 ) -> None: 

1489 self.process_compile_state(compile_state) 

1490 

1491 def process_compile_state(self, compile_state: _ORMCompileState) -> None: 

1492 """Apply a modification to a given :class:`.CompileState`.""" 

1493 

1494 # if options to limit the criteria to immediate query only, 

1495 # use compile_state.attributes instead 

1496 

1497 self.get_global_criteria(compile_state.global_attributes) 

1498 

1499 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1500 for mp in self._all_mappers(): 

1501 load_criteria = attributes.setdefault( 

1502 ("additional_entity_criteria", mp), [] 

1503 ) 

1504 

1505 load_criteria.append(self) 

1506 

1507 

1508inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1509 

1510 

1511@inspection._inspects(type) 

1512def _inspect_mc( 

1513 class_: Type[_O], 

1514) -> Optional[Mapper[_O]]: 

1515 try: 

1516 class_manager = opt_manager_of_class(class_) 

1517 if class_manager is None or not class_manager.is_mapped: 

1518 return None 

1519 mapper = class_manager.mapper 

1520 except exc.NO_STATE: 

1521 return None 

1522 else: 

1523 return mapper 

1524 

1525 

1526GenericAlias = type(List[Any]) 

1527 

1528 

1529@inspection._inspects(GenericAlias) 

1530def _inspect_generic_alias( 

1531 class_: Type[_O], 

1532) -> Optional[Mapper[_O]]: 

1533 origin = cast("Type[_O]", get_origin(class_)) 

1534 return _inspect_mc(origin) 

1535 

1536 

1537@inspection._self_inspects 

1538class Bundle( 

1539 ORMColumnsClauseRole[_T], 

1540 SupportsCloneAnnotations, 

1541 MemoizedHasCacheKey, 

1542 inspection.Inspectable["Bundle[_T]"], 

1543 InspectionAttr, 

1544): 

1545 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1546 under one namespace. 

1547 

1548 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1549 results returned by a column-oriented :class:`_query.Query` object. 

1550 It also 

1551 is extensible via simple subclassing, where the primary capability 

1552 to override is that of how the set of expressions should be returned, 

1553 allowing post-processing as well as custom return types, without 

1554 involving ORM identity-mapped classes. 

1555 

1556 .. seealso:: 

1557 

1558 :ref:`bundles` 

1559 

1560 

1561 """ 

1562 

1563 single_entity = False 

1564 """If True, queries for a single Bundle will be returned as a single 

1565 entity, rather than an element within a keyed tuple.""" 

1566 

1567 is_clause_element = False 

1568 

1569 is_mapper = False 

1570 

1571 is_aliased_class = False 

1572 

1573 is_bundle = True 

1574 

1575 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1576 

1577 proxy_set = util.EMPTY_SET 

1578 

1579 exprs: List[_ColumnsClauseElement] 

1580 

1581 def __init__( 

1582 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1583 ): 

1584 r"""Construct a new :class:`.Bundle`. 

1585 

1586 e.g.:: 

1587 

1588 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1589 

1590 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1591 print(row.mybundle.x, row.mybundle.y) 

1592 

1593 :param name: name of the bundle. 

1594 :param \*exprs: columns or SQL expressions comprising the bundle. 

1595 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1596 can be returned as a "single entity" outside of any enclosing tuple 

1597 in the same manner as a mapped entity. 

1598 

1599 """ # noqa: E501 

1600 self.name = self._label = name 

1601 coerced_exprs = [ 

1602 coercions.expect( 

1603 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1604 ) 

1605 for expr in exprs 

1606 ] 

1607 self.exprs = coerced_exprs 

1608 

1609 self.c = self.columns = ColumnCollection( 

1610 (getattr(col, "key", col._label), col) 

1611 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1612 ).as_readonly() 

1613 self.single_entity = kw.pop("single_entity", self.single_entity) 

1614 

1615 def _gen_cache_key( 

1616 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1617 ) -> Tuple[Any, ...]: 

1618 return (self.__class__, self.name, self.single_entity) + tuple( 

1619 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1620 ) 

1621 

1622 @property 

1623 def mapper(self) -> Optional[Mapper[Any]]: 

1624 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1625 "parentmapper", None 

1626 ) 

1627 return mp 

1628 

1629 @property 

1630 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1631 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1632 0 

1633 ]._annotations.get("parententity", None) 

1634 return ie 

1635 

1636 @property 

1637 def entity_namespace( 

1638 self, 

1639 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1640 return self.c 

1641 

1642 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1643 

1644 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1645 

1646 e.g.:: 

1647 

1648 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1649 

1650 q = sess.query(bn).filter(bn.c.x == 5) 

1651 

1652 Nesting of bundles is also supported:: 

1653 

1654 b1 = Bundle( 

1655 "b1", 

1656 Bundle("b2", MyClass.a, MyClass.b), 

1657 Bundle("b3", MyClass.x, MyClass.y), 

1658 ) 

1659 

1660 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1661 

1662 .. seealso:: 

1663 

1664 :attr:`.Bundle.c` 

1665 

1666 """ # noqa: E501 

1667 

1668 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1669 """An alias for :attr:`.Bundle.columns`.""" 

1670 

1671 def _clone(self, **kw): 

1672 cloned = self.__class__.__new__(self.__class__) 

1673 cloned.__dict__.update(self.__dict__) 

1674 return cloned 

1675 

1676 def __clause_element__(self): 

1677 # ensure existing entity_namespace remains 

1678 annotations = {"bundle": self, "entity_namespace": self} 

1679 annotations.update(self._annotations) 

1680 

1681 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1682 "plugin_subject", self.entity 

1683 ) 

1684 return ( 

1685 expression.ClauseList( 

1686 _literal_as_text_role=roles.ColumnsClauseRole, 

1687 group=False, 

1688 *[e._annotations.get("bundle", e) for e in self.exprs], 

1689 ) 

1690 ._annotate(annotations) 

1691 ._set_propagate_attrs( 

1692 # the Bundle *must* use the orm plugin no matter what. the 

1693 # subject can be None but it's much better if it's not. 

1694 { 

1695 "compile_state_plugin": "orm", 

1696 "plugin_subject": plugin_subject, 

1697 } 

1698 ) 

1699 ) 

1700 

1701 @property 

1702 def clauses(self): 

1703 return self.__clause_element__().clauses 

1704 

1705 def label(self, name): 

1706 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1707 

1708 cloned = self._clone() 

1709 cloned.name = name 

1710 return cloned 

1711 

1712 def create_row_processor( 

1713 self, 

1714 query: Select[Unpack[TupleAny]], 

1715 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1716 labels: Sequence[str], 

1717 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1718 """Produce the "row processing" function for this :class:`.Bundle`. 

1719 

1720 May be overridden by subclasses to provide custom behaviors when 

1721 results are fetched. The method is passed the statement object and a 

1722 set of "row processor" functions at query execution time; these 

1723 processor functions when given a result row will return the individual 

1724 attribute value, which can then be adapted into any kind of return data 

1725 structure. 

1726 

1727 The example below illustrates replacing the usual :class:`.Row` 

1728 return structure with a straight Python dictionary:: 

1729 

1730 from sqlalchemy.orm import Bundle 

1731 

1732 

1733 class DictBundle(Bundle): 

1734 def create_row_processor(self, query, procs, labels): 

1735 "Override create_row_processor to return values as dictionaries" 

1736 

1737 def proc(row): 

1738 return dict(zip(labels, (proc(row) for proc in procs))) 

1739 

1740 return proc 

1741 

1742 A result from the above :class:`_orm.Bundle` will return dictionary 

1743 values:: 

1744 

1745 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1746 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1747 print(row.mybundle["data1"], row.mybundle["data2"]) 

1748 

1749 """ # noqa: E501 

1750 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1751 

1752 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1753 return keyed_tuple([proc(row) for proc in procs]) 

1754 

1755 return proc 

1756 

1757 

1758def _orm_full_deannotate(element: _SA) -> _SA: 

1759 return sql_util._deep_deannotate(element) 

1760 

1761 

1762class _ORMJoin(expression.Join): 

1763 """Extend Join to support ORM constructs as input.""" 

1764 

1765 __visit_name__ = expression.Join.__visit_name__ 

1766 

1767 inherit_cache = True 

1768 

1769 def __init__( 

1770 self, 

1771 left: _FromClauseArgument, 

1772 right: _FromClauseArgument, 

1773 onclause: Optional[_OnClauseArgument] = None, 

1774 isouter: bool = False, 

1775 full: bool = False, 

1776 _left_memo: Optional[Any] = None, 

1777 _right_memo: Optional[Any] = None, 

1778 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1779 ): 

1780 left_info = cast( 

1781 "Union[FromClause, _InternalEntityType[Any]]", 

1782 inspection.inspect(left), 

1783 ) 

1784 

1785 right_info = cast( 

1786 "Union[FromClause, _InternalEntityType[Any]]", 

1787 inspection.inspect(right), 

1788 ) 

1789 adapt_to = right_info.selectable 

1790 

1791 # used by joined eager loader 

1792 self._left_memo = _left_memo 

1793 self._right_memo = _right_memo 

1794 

1795 if isinstance(onclause, attributes.QueryableAttribute): 

1796 if TYPE_CHECKING: 

1797 assert isinstance( 

1798 onclause.comparator, RelationshipProperty.Comparator 

1799 ) 

1800 on_selectable = onclause.comparator._source_selectable() 

1801 prop = onclause.property 

1802 _extra_criteria += onclause._extra_criteria 

1803 elif isinstance(onclause, MapperProperty): 

1804 # used internally by joined eager loader...possibly not ideal 

1805 prop = onclause 

1806 on_selectable = prop.parent.selectable 

1807 else: 

1808 prop = None 

1809 on_selectable = None 

1810 

1811 left_selectable = left_info.selectable 

1812 if prop: 

1813 adapt_from: Optional[FromClause] 

1814 if sql_util.clause_is_present(on_selectable, left_selectable): 

1815 adapt_from = on_selectable 

1816 else: 

1817 assert isinstance(left_selectable, FromClause) 

1818 adapt_from = left_selectable 

1819 

1820 ( 

1821 pj, 

1822 sj, 

1823 source, 

1824 dest, 

1825 secondary, 

1826 target_adapter, 

1827 ) = prop._create_joins( 

1828 source_selectable=adapt_from, 

1829 dest_selectable=adapt_to, 

1830 source_polymorphic=True, 

1831 of_type_entity=right_info, 

1832 alias_secondary=True, 

1833 extra_criteria=_extra_criteria, 

1834 ) 

1835 

1836 if sj is not None: 

1837 if isouter: 

1838 # note this is an inner join from secondary->right 

1839 right = sql.join(secondary, right, sj) 

1840 onclause = pj 

1841 else: 

1842 left = sql.join(left, secondary, pj, isouter) 

1843 onclause = sj 

1844 else: 

1845 onclause = pj 

1846 

1847 self._target_adapter = target_adapter 

1848 

1849 # we don't use the normal coercions logic for _ORMJoin 

1850 # (probably should), so do some gymnastics to get the entity. 

1851 # logic here is for #8721, which was a major bug in 1.4 

1852 # for almost two years, not reported/fixed until 1.4.43 (!) 

1853 if is_selectable(left_info): 

1854 parententity = left_selectable._annotations.get( 

1855 "parententity", None 

1856 ) 

1857 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1858 parententity = left_info 

1859 else: 

1860 parententity = None 

1861 

1862 if parententity is not None: 

1863 self._annotations = self._annotations.union( 

1864 {"parententity": parententity} 

1865 ) 

1866 

1867 augment_onclause = bool(_extra_criteria) and not prop 

1868 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1869 

1870 assert self.onclause is not None 

1871 

1872 if augment_onclause: 

1873 self.onclause &= sql.and_(*_extra_criteria) 

1874 

1875 if ( 

1876 not prop 

1877 and getattr(right_info, "mapper", None) 

1878 and right_info.mapper.single # type: ignore 

1879 ): 

1880 right_info = cast("_InternalEntityType[Any]", right_info) 

1881 # if single inheritance target and we are using a manual 

1882 # or implicit ON clause, augment it the same way we'd augment the 

1883 # WHERE. 

1884 single_crit = right_info.mapper._single_table_criterion 

1885 if single_crit is not None: 

1886 if insp_is_aliased_class(right_info): 

1887 single_crit = right_info._adapter.traverse(single_crit) 

1888 self.onclause = self.onclause & single_crit 

1889 

1890 def _splice_into_center(self, other): 

1891 """Splice a join into the center. 

1892 

1893 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1894 

1895 """ 

1896 leftmost = other 

1897 while isinstance(leftmost, sql.Join): 

1898 leftmost = leftmost.left 

1899 

1900 assert self.right is leftmost 

1901 

1902 left = _ORMJoin( 

1903 self.left, 

1904 other.left, 

1905 self.onclause, 

1906 isouter=self.isouter, 

1907 _left_memo=self._left_memo, 

1908 _right_memo=other._left_memo._path_registry, 

1909 ) 

1910 

1911 return _ORMJoin( 

1912 left, 

1913 other.right, 

1914 other.onclause, 

1915 isouter=other.isouter, 

1916 _right_memo=other._right_memo, 

1917 ) 

1918 

1919 def join( 

1920 self, 

1921 right: _FromClauseArgument, 

1922 onclause: Optional[_OnClauseArgument] = None, 

1923 isouter: bool = False, 

1924 full: bool = False, 

1925 ) -> _ORMJoin: 

1926 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1927 

1928 def outerjoin( 

1929 self, 

1930 right: _FromClauseArgument, 

1931 onclause: Optional[_OnClauseArgument] = None, 

1932 full: bool = False, 

1933 ) -> _ORMJoin: 

1934 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1935 

1936 

1937def with_parent( 

1938 instance: object, 

1939 prop: attributes.QueryableAttribute[Any], 

1940 from_entity: Optional[_EntityType[Any]] = None, 

1941) -> ColumnElement[bool]: 

1942 """Create filtering criterion that relates this query's primary entity 

1943 to the given related instance, using established 

1944 :func:`_orm.relationship()` 

1945 configuration. 

1946 

1947 E.g.:: 

1948 

1949 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1950 

1951 The SQL rendered is the same as that rendered when a lazy loader 

1952 would fire off from the given parent on that attribute, meaning 

1953 that the appropriate state is taken from the parent object in 

1954 Python without the need to render joins to the parent table 

1955 in the rendered statement. 

1956 

1957 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

1958 to indicate the left side of the criteria:: 

1959 

1960 

1961 a1 = aliased(Address) 

1962 a2 = aliased(Address) 

1963 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

1964 

1965 The above use is equivalent to using the 

1966 :func:`_orm.with_parent.from_entity` argument:: 

1967 

1968 a1 = aliased(Address) 

1969 a2 = aliased(Address) 

1970 stmt = select(a1, a2).where( 

1971 with_parent(u1, User.addresses, from_entity=a2) 

1972 ) 

1973 

1974 :param instance: 

1975 An instance which has some :func:`_orm.relationship`. 

1976 

1977 :param property: 

1978 Class-bound attribute, which indicates 

1979 what relationship from the instance should be used to reconcile the 

1980 parent/child relationship. 

1981 

1982 :param from_entity: 

1983 Entity in which to consider as the left side. This defaults to the 

1984 "zero" entity of the :class:`_query.Query` itself. 

1985 

1986 """ # noqa: E501 

1987 prop_t: RelationshipProperty[Any] 

1988 

1989 if isinstance(prop, str): 

1990 raise sa_exc.ArgumentError( 

1991 "with_parent() accepts class-bound mapped attributes, not strings" 

1992 ) 

1993 elif isinstance(prop, attributes.QueryableAttribute): 

1994 if prop._of_type: 

1995 from_entity = prop._of_type 

1996 mapper_property = prop.property 

1997 if mapper_property is None or not prop_is_relationship( 

1998 mapper_property 

1999 ): 

2000 raise sa_exc.ArgumentError( 

2001 f"Expected relationship property for with_parent(), " 

2002 f"got {mapper_property}" 

2003 ) 

2004 prop_t = mapper_property 

2005 else: 

2006 prop_t = prop 

2007 

2008 return prop_t._with_parent(instance, from_entity=from_entity) 

2009 

2010 

2011def has_identity(object_: object) -> bool: 

2012 """Return True if the given object has a database 

2013 identity. 

2014 

2015 This typically corresponds to the object being 

2016 in either the persistent or detached state. 

2017 

2018 .. seealso:: 

2019 

2020 :func:`.was_deleted` 

2021 

2022 """ 

2023 state = attributes.instance_state(object_) 

2024 return state.has_identity 

2025 

2026 

2027def was_deleted(object_: object) -> bool: 

2028 """Return True if the given object was deleted 

2029 within a session flush. 

2030 

2031 This is regardless of whether or not the object is 

2032 persistent or detached. 

2033 

2034 .. seealso:: 

2035 

2036 :attr:`.InstanceState.was_deleted` 

2037 

2038 """ 

2039 

2040 state = attributes.instance_state(object_) 

2041 return state.was_deleted 

2042 

2043 

2044def _entity_corresponds_to( 

2045 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2046) -> bool: 

2047 """determine if 'given' corresponds to 'entity', in terms 

2048 of an entity passed to Query that would match the same entity 

2049 being referred to elsewhere in the query. 

2050 

2051 """ 

2052 if insp_is_aliased_class(entity): 

2053 if insp_is_aliased_class(given): 

2054 if entity._base_alias() is given._base_alias(): 

2055 return True 

2056 return False 

2057 elif insp_is_aliased_class(given): 

2058 if given._use_mapper_path: 

2059 return entity in given.with_polymorphic_mappers 

2060 else: 

2061 return entity is given 

2062 

2063 assert insp_is_mapper(given) 

2064 return entity.common_parent(given) 

2065 

2066 

2067def _entity_corresponds_to_use_path_impl( 

2068 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2069) -> bool: 

2070 """determine if 'given' corresponds to 'entity', in terms 

2071 of a path of loader options where a mapped attribute is taken to 

2072 be a member of a parent entity. 

2073 

2074 e.g.:: 

2075 

2076 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2077 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2078 

2079 a1 = aliased(A) 

2080 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2081 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2082 

2083 wp = with_polymorphic(A, [A1, A2]) 

2084 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2085 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2086 

2087 """ 

2088 if insp_is_aliased_class(given): 

2089 return ( 

2090 insp_is_aliased_class(entity) 

2091 and not entity._use_mapper_path 

2092 and (given is entity or entity in given._with_polymorphic_entities) 

2093 ) 

2094 elif not insp_is_aliased_class(entity): 

2095 return given.isa(entity.mapper) 

2096 else: 

2097 return ( 

2098 entity._use_mapper_path 

2099 and given in entity.with_polymorphic_mappers 

2100 ) 

2101 

2102 

2103def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2104 """determine if 'given' "is a" mapper, in terms of the given 

2105 would load rows of type 'mapper'. 

2106 

2107 """ 

2108 if given.is_aliased_class: 

2109 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2110 mapper 

2111 ) 

2112 elif given.with_polymorphic_mappers: 

2113 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2114 else: 

2115 return given.isa(mapper) 

2116 

2117 

2118def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2119 """calculate __getitem__ in terms of an iterable query object 

2120 that also has a slice() method. 

2121 

2122 """ 

2123 

2124 def _no_negative_indexes(): 

2125 raise IndexError( 

2126 "negative indexes are not accepted by SQL " 

2127 "index / slice operators" 

2128 ) 

2129 

2130 if isinstance(item, slice): 

2131 start, stop, step = util.decode_slice(item) 

2132 

2133 if ( 

2134 isinstance(stop, int) 

2135 and isinstance(start, int) 

2136 and stop - start <= 0 

2137 ): 

2138 return [] 

2139 

2140 elif (isinstance(start, int) and start < 0) or ( 

2141 isinstance(stop, int) and stop < 0 

2142 ): 

2143 _no_negative_indexes() 

2144 

2145 res = iterable_query.slice(start, stop) 

2146 if step is not None: 

2147 return list(res)[None : None : item.step] 

2148 else: 

2149 return list(res) 

2150 else: 

2151 if item == -1: 

2152 _no_negative_indexes() 

2153 else: 

2154 return list(iterable_query[item : item + 1])[0] 

2155 

2156 

2157def _is_mapped_annotation( 

2158 raw_annotation: _AnnotationScanType, 

2159 cls: Type[Any], 

2160 originating_cls: Type[Any], 

2161) -> bool: 

2162 try: 

2163 annotated = de_stringify_annotation( 

2164 cls, raw_annotation, originating_cls.__module__ 

2165 ) 

2166 except NameError: 

2167 # in most cases, at least within our own tests, we can raise 

2168 # here, which is more accurate as it prevents us from returning 

2169 # false negatives. However, in the real world, try to avoid getting 

2170 # involved with end-user annotations that have nothing to do with us. 

2171 # see issue #8888 where we bypass using this function in the case 

2172 # that we want to detect an unresolvable Mapped[] type. 

2173 return False 

2174 else: 

2175 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2176 

2177 

2178class _CleanupError(Exception): 

2179 pass 

2180 

2181 

2182def _cleanup_mapped_str_annotation( 

2183 annotation: str, originating_module: str 

2184) -> str: 

2185 # fix up an annotation that comes in as the form: 

2186 # 'Mapped[List[Address]]' so that it instead looks like: 

2187 # 'Mapped[List["Address"]]' , which will allow us to get 

2188 # "Address" as a string 

2189 

2190 # additionally, resolve symbols for these names since this is where 

2191 # we'd have to do it 

2192 

2193 inner: Optional[Match[str]] 

2194 

2195 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2196 

2197 if not mm: 

2198 return annotation 

2199 

2200 # ticket #8759. Resolve the Mapped name to a real symbol. 

2201 # originally this just checked the name. 

2202 try: 

2203 obj = eval_name_only(mm.group(1), originating_module) 

2204 except NameError as ne: 

2205 raise _CleanupError( 

2206 f'For annotation "{annotation}", could not resolve ' 

2207 f'container type "{mm.group(1)}". ' 

2208 "Please ensure this type is imported at the module level " 

2209 "outside of TYPE_CHECKING blocks" 

2210 ) from ne 

2211 

2212 if obj is typing.ClassVar: 

2213 real_symbol = "ClassVar" 

2214 else: 

2215 try: 

2216 if issubclass(obj, _MappedAnnotationBase): 

2217 real_symbol = obj.__name__ 

2218 else: 

2219 return annotation 

2220 except TypeError: 

2221 # avoid isinstance(obj, type) check, just catch TypeError 

2222 return annotation 

2223 

2224 # note: if one of the codepaths above didn't define real_symbol and 

2225 # then didn't return, real_symbol raises UnboundLocalError 

2226 # which is actually a NameError, and the calling routines don't 

2227 # notice this since they are catching NameError anyway. Just in case 

2228 # this is being modified in the future, something to be aware of. 

2229 

2230 stack = [] 

2231 inner = mm 

2232 while True: 

2233 stack.append(real_symbol if mm is inner else inner.group(1)) 

2234 g2 = inner.group(2) 

2235 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2236 if inner is None: 

2237 stack.append(g2) 

2238 break 

2239 

2240 # stacks we want to rewrite, that is, quote the last entry which 

2241 # we think is a relationship class name: 

2242 # 

2243 # ['Mapped', 'List', 'Address'] 

2244 # ['Mapped', 'A'] 

2245 # 

2246 # stacks we dont want to rewrite, which are generally MappedColumn 

2247 # use cases: 

2248 # 

2249 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2250 # ['Mapped', 'dict[str, str] | None'] 

2251 

2252 if ( 

2253 # avoid already quoted symbols such as 

2254 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2255 not re.match(r"""^["'].*["']$""", stack[-1]) 

2256 # avoid further generics like Dict[] such as 

2257 # ['Mapped', 'dict[str, str] | None'], 

2258 # ['Mapped', 'list[int] | list[str]'], 

2259 # ['Mapped', 'Union[list[int], list[str]]'], 

2260 and not re.search(r"[\[\]]", stack[-1]) 

2261 ): 

2262 stripchars = "\"' " 

2263 stack[-1] = ", ".join( 

2264 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2265 ) 

2266 

2267 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2268 

2269 return annotation 

2270 

2271 

2272def _extract_mapped_subtype( 

2273 raw_annotation: Optional[_AnnotationScanType], 

2274 cls: type, 

2275 originating_module: str, 

2276 key: str, 

2277 attr_cls: Type[Any], 

2278 required: bool, 

2279 is_dataclass_field: bool, 

2280 expect_mapped: bool = True, 

2281 raiseerr: bool = True, 

2282) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2283 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2284 so, return the ``something`` part. 

2285 

2286 Includes error raise scenarios and other options. 

2287 

2288 """ 

2289 

2290 if raw_annotation is None: 

2291 if required: 

2292 raise orm_exc.MappedAnnotationError( 

2293 f"Python typing annotation is required for attribute " 

2294 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2295 f'"{attr_cls.__name__}" construct are None or not present' 

2296 ) 

2297 return None 

2298 

2299 try: 

2300 # destringify the "outside" of the annotation. note we are not 

2301 # adding include_generic so it will *not* dig into generic contents, 

2302 # which will remain as ForwardRef or plain str under future annotations 

2303 # mode. The full destringify happens later when mapped_column goes 

2304 # to do a full lookup in the registry type_annotations_map. 

2305 annotated = de_stringify_annotation( 

2306 cls, 

2307 raw_annotation, 

2308 originating_module, 

2309 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2310 ) 

2311 except _CleanupError as ce: 

2312 raise orm_exc.MappedAnnotationError( 

2313 f"Could not interpret annotation {raw_annotation}. " 

2314 "Check that it uses names that are correctly imported at the " 

2315 "module level. See chained stack trace for more hints." 

2316 ) from ce 

2317 except NameError as ne: 

2318 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2319 raise orm_exc.MappedAnnotationError( 

2320 f"Could not interpret annotation {raw_annotation}. " 

2321 "Check that it uses names that are correctly imported at the " 

2322 "module level. See chained stack trace for more hints." 

2323 ) from ne 

2324 

2325 annotated = raw_annotation # type: ignore 

2326 

2327 if is_dataclass_field: 

2328 return annotated, None 

2329 else: 

2330 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2331 annotated, _MappedAnnotationBase 

2332 ): 

2333 if expect_mapped: 

2334 if not raiseerr: 

2335 return None 

2336 

2337 origin = getattr(annotated, "__origin__", None) 

2338 if origin is typing.ClassVar: 

2339 return None 

2340 

2341 # check for other kind of ORM descriptor like AssociationProxy, 

2342 # don't raise for that (issue #9957) 

2343 elif isinstance(origin, type) and issubclass( 

2344 origin, ORMDescriptor 

2345 ): 

2346 return None 

2347 

2348 raise orm_exc.MappedAnnotationError( 

2349 f'Type annotation for "{cls.__name__}.{key}" ' 

2350 "can't be correctly interpreted for " 

2351 "Annotated Declarative Table form. ORM annotations " 

2352 "should normally make use of the ``Mapped[]`` generic " 

2353 "type, or other ORM-compatible generic type, as a " 

2354 "container for the actual type, which indicates the " 

2355 "intent that the attribute is mapped. " 

2356 "Class variables that are not intended to be mapped " 

2357 "by the ORM should use ClassVar[]. " 

2358 "To allow Annotated Declarative to disregard legacy " 

2359 "annotations which don't use Mapped[] to pass, set " 

2360 '"__allow_unmapped__ = True" on the class or a ' 

2361 "superclass this class.", 

2362 code="zlpr", 

2363 ) 

2364 

2365 else: 

2366 return annotated, None 

2367 

2368 if len(annotated.__args__) != 1: 

2369 raise orm_exc.MappedAnnotationError( 

2370 "Expected sub-type for Mapped[] annotation" 

2371 ) 

2372 

2373 return ( 

2374 # fix dict/list/set args to be ForwardRef, see #11814 

2375 fixup_container_fwd_refs(annotated.__args__[0]), 

2376 annotated.__origin__, 

2377 ) 

2378 

2379 

2380def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2381 if hasattr(prop, "_mapper_property_name"): 

2382 name = prop._mapper_property_name() 

2383 else: 

2384 name = None 

2385 return util.clsname_as_plain_name(prop, name)