Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

817 statements  

1# orm/util.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Match 

27from typing import Optional 

28from typing import Sequence 

29from typing import Tuple 

30from typing import Type 

31from typing import TYPE_CHECKING 

32from typing import TypeVar 

33from typing import Union 

34import weakref 

35 

36from . import attributes # noqa 

37from . import exc 

38from . import exc as orm_exc 

39from ._typing import _O 

40from ._typing import insp_is_aliased_class 

41from ._typing import insp_is_mapper 

42from ._typing import prop_is_relationship 

43from .base import _class_to_mapper as _class_to_mapper 

44from .base import _MappedAnnotationBase 

45from .base import _never_set as _never_set # noqa: F401 

46from .base import _none_only_set as _none_only_set # noqa: F401 

47from .base import _none_set as _none_set # noqa: F401 

48from .base import attribute_str as attribute_str # noqa: F401 

49from .base import class_mapper as class_mapper 

50from .base import DynamicMapped 

51from .base import InspectionAttr as InspectionAttr 

52from .base import instance_str as instance_str # noqa: F401 

53from .base import Mapped 

54from .base import object_mapper as object_mapper 

55from .base import object_state as object_state # noqa: F401 

56from .base import opt_manager_of_class 

57from .base import ORMDescriptor 

58from .base import state_attribute_str as state_attribute_str # noqa: F401 

59from .base import state_class_str as state_class_str # noqa: F401 

60from .base import state_str as state_str # noqa: F401 

61from .base import WriteOnlyMapped 

62from .interfaces import CriteriaOption 

63from .interfaces import MapperProperty as MapperProperty 

64from .interfaces import ORMColumnsClauseRole 

65from .interfaces import ORMEntityColumnsClauseRole 

66from .interfaces import ORMFromClauseRole 

67from .path_registry import PathRegistry as PathRegistry 

68from .. import event 

69from .. import exc as sa_exc 

70from .. import inspection 

71from .. import sql 

72from .. import util 

73from ..engine.result import result_tuple 

74from ..sql import coercions 

75from ..sql import expression 

76from ..sql import lambdas 

77from ..sql import roles 

78from ..sql import util as sql_util 

79from ..sql import visitors 

80from ..sql._typing import is_selectable 

81from ..sql.annotation import SupportsCloneAnnotations 

82from ..sql.base import ColumnCollection 

83from ..sql.cache_key import HasCacheKey 

84from ..sql.cache_key import MemoizedHasCacheKey 

85from ..sql.elements import ColumnElement 

86from ..sql.elements import KeyedColumnElement 

87from ..sql.selectable import FromClause 

88from ..util.langhelpers import MemoizedSlots 

89from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

90from ..util.typing import eval_name_only as _eval_name_only 

91from ..util.typing import fixup_container_fwd_refs 

92from ..util.typing import get_origin 

93from ..util.typing import is_origin_of_cls 

94from ..util.typing import Literal 

95from ..util.typing import Protocol 

96 

97if typing.TYPE_CHECKING: 

98 from ._typing import _EntityType 

99 from ._typing import _IdentityKeyType 

100 from ._typing import _InternalEntityType 

101 from ._typing import _ORMCOLEXPR 

102 from .context import _MapperEntity 

103 from .context import ORMCompileState 

104 from .mapper import Mapper 

105 from .path_registry import AbstractEntityRegistry 

106 from .query import Query 

107 from .relationships import RelationshipProperty 

108 from ..engine import Row 

109 from ..engine import RowMapping 

110 from ..sql._typing import _CE 

111 from ..sql._typing import _ColumnExpressionArgument 

112 from ..sql._typing import _EquivalentColumnMap 

113 from ..sql._typing import _FromClauseArgument 

114 from ..sql._typing import _OnClauseArgument 

115 from ..sql._typing import _PropagateAttrsType 

116 from ..sql.annotation import _SA 

117 from ..sql.base import ReadOnlyColumnCollection 

118 from ..sql.elements import BindParameter 

119 from ..sql.selectable import _ColumnsClauseElement 

120 from ..sql.selectable import Select 

121 from ..sql.selectable import Selectable 

122 from ..sql.visitors import anon_map 

123 from ..util.typing import _AnnotationScanType 

124 

125_T = TypeVar("_T", bound=Any) 

126 

127all_cascades = frozenset( 

128 ( 

129 "delete", 

130 "delete-orphan", 

131 "all", 

132 "merge", 

133 "expunge", 

134 "save-update", 

135 "refresh-expire", 

136 "none", 

137 ) 

138) 

139 

140_de_stringify_partial = functools.partial( 

141 functools.partial, 

142 locals_=util.immutabledict( 

143 { 

144 "Mapped": Mapped, 

145 "WriteOnlyMapped": WriteOnlyMapped, 

146 "DynamicMapped": DynamicMapped, 

147 } 

148 ), 

149) 

150 

151# partial is practically useless as we have to write out the whole 

152# function and maintain the signature anyway 

153 

154 

155class _DeStringifyAnnotation(Protocol): 

156 def __call__( 

157 self, 

158 cls: Type[Any], 

159 annotation: _AnnotationScanType, 

160 originating_module: str, 

161 *, 

162 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

163 include_generic: bool = False, 

164 ) -> Type[Any]: ... 

165 

166 

167de_stringify_annotation = cast( 

168 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

169) 

170 

171 

172class _EvalNameOnly(Protocol): 

173 def __call__(self, name: str, module_name: str) -> Any: ... 

174 

175 

176eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

177 

178 

179class CascadeOptions(FrozenSet[str]): 

180 """Keeps track of the options sent to 

181 :paramref:`.relationship.cascade`""" 

182 

183 _add_w_all_cascades = all_cascades.difference( 

184 ["all", "none", "delete-orphan"] 

185 ) 

186 _allowed_cascades = all_cascades 

187 

188 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

189 

190 __slots__ = ( 

191 "save_update", 

192 "delete", 

193 "refresh_expire", 

194 "merge", 

195 "expunge", 

196 "delete_orphan", 

197 ) 

198 

199 save_update: bool 

200 delete: bool 

201 refresh_expire: bool 

202 merge: bool 

203 expunge: bool 

204 delete_orphan: bool 

205 

206 def __new__( 

207 cls, value_list: Optional[Union[Iterable[str], str]] 

208 ) -> CascadeOptions: 

209 if isinstance(value_list, str) or value_list is None: 

210 return cls.from_string(value_list) # type: ignore 

211 values = set(value_list) 

212 if values.difference(cls._allowed_cascades): 

213 raise sa_exc.ArgumentError( 

214 "Invalid cascade option(s): %s" 

215 % ", ".join( 

216 [ 

217 repr(x) 

218 for x in sorted( 

219 values.difference(cls._allowed_cascades) 

220 ) 

221 ] 

222 ) 

223 ) 

224 

225 if "all" in values: 

226 values.update(cls._add_w_all_cascades) 

227 if "none" in values: 

228 values.clear() 

229 values.discard("all") 

230 

231 self = super().__new__(cls, values) 

232 self.save_update = "save-update" in values 

233 self.delete = "delete" in values 

234 self.refresh_expire = "refresh-expire" in values 

235 self.merge = "merge" in values 

236 self.expunge = "expunge" in values 

237 self.delete_orphan = "delete-orphan" in values 

238 

239 if self.delete_orphan and not self.delete: 

240 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

241 return self 

242 

243 def __repr__(self): 

244 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

245 

246 @classmethod 

247 def from_string(cls, arg): 

248 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

249 return cls(values) 

250 

251 

252def _validator_events(desc, key, validator, include_removes, include_backrefs): 

253 """Runs a validation method on an attribute value to be set or 

254 appended. 

255 """ 

256 

257 if not include_backrefs: 

258 

259 def detect_is_backref(state, initiator): 

260 impl = state.manager[key].impl 

261 return initiator.impl is not impl 

262 

263 if include_removes: 

264 

265 def append(state, value, initiator): 

266 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

267 include_backrefs or not detect_is_backref(state, initiator) 

268 ): 

269 return validator(state.obj(), key, value, False) 

270 else: 

271 return value 

272 

273 def bulk_set(state, values, initiator): 

274 if include_backrefs or not detect_is_backref(state, initiator): 

275 obj = state.obj() 

276 values[:] = [ 

277 validator(obj, key, value, False) for value in values 

278 ] 

279 

280 def set_(state, value, oldvalue, initiator): 

281 if include_backrefs or not detect_is_backref(state, initiator): 

282 return validator(state.obj(), key, value, False) 

283 else: 

284 return value 

285 

286 def remove(state, value, initiator): 

287 if include_backrefs or not detect_is_backref(state, initiator): 

288 validator(state.obj(), key, value, True) 

289 

290 else: 

291 

292 def append(state, value, initiator): 

293 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

294 include_backrefs or not detect_is_backref(state, initiator) 

295 ): 

296 return validator(state.obj(), key, value) 

297 else: 

298 return value 

299 

300 def bulk_set(state, values, initiator): 

301 if include_backrefs or not detect_is_backref(state, initiator): 

302 obj = state.obj() 

303 values[:] = [validator(obj, key, value) for value in values] 

304 

305 def set_(state, value, oldvalue, initiator): 

306 if include_backrefs or not detect_is_backref(state, initiator): 

307 return validator(state.obj(), key, value) 

308 else: 

309 return value 

310 

311 event.listen(desc, "append", append, raw=True, retval=True) 

312 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

313 event.listen(desc, "set", set_, raw=True, retval=True) 

314 if include_removes: 

315 event.listen(desc, "remove", remove, raw=True, retval=True) 

316 

317 

318def polymorphic_union( 

319 table_map, typecolname, aliasname="p_union", cast_nulls=True 

320): 

321 """Create a ``UNION`` statement used by a polymorphic mapper. 

322 

323 See :ref:`concrete_inheritance` for an example of how 

324 this is used. 

325 

326 :param table_map: mapping of polymorphic identities to 

327 :class:`_schema.Table` objects. 

328 :param typecolname: string name of a "discriminator" column, which will be 

329 derived from the query, producing the polymorphic identity for 

330 each row. If ``None``, no polymorphic discriminator is generated. 

331 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

332 construct generated. 

333 :param cast_nulls: if True, non-existent columns, which are represented 

334 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

335 that is problematic on some backends such as Oracle - in which case it 

336 can be set to False. 

337 

338 """ 

339 

340 colnames: util.OrderedSet[str] = util.OrderedSet() 

341 colnamemaps = {} 

342 types = {} 

343 for key in table_map: 

344 table = table_map[key] 

345 

346 table = coercions.expect( 

347 roles.StrictFromClauseRole, table, allow_select=True 

348 ) 

349 table_map[key] = table 

350 

351 m = {} 

352 for c in table.c: 

353 if c.key == typecolname: 

354 raise sa_exc.InvalidRequestError( 

355 "Polymorphic union can't use '%s' as the discriminator " 

356 "column due to mapped column %r; please apply the " 

357 "'typecolname' " 

358 "argument; this is available on " 

359 "ConcreteBase as '_concrete_discriminator_name'" 

360 % (typecolname, c) 

361 ) 

362 colnames.add(c.key) 

363 m[c.key] = c 

364 types[c.key] = c.type 

365 colnamemaps[table] = m 

366 

367 def col(name, table): 

368 try: 

369 return colnamemaps[table][name] 

370 except KeyError: 

371 if cast_nulls: 

372 return sql.cast(sql.null(), types[name]).label(name) 

373 else: 

374 return sql.type_coerce(sql.null(), types[name]).label(name) 

375 

376 result = [] 

377 for type_, table in table_map.items(): 

378 if typecolname is not None: 

379 result.append( 

380 sql.select( 

381 *( 

382 [col(name, table) for name in colnames] 

383 + [ 

384 sql.literal_column( 

385 sql_util._quote_ddl_expr(type_) 

386 ).label(typecolname) 

387 ] 

388 ) 

389 ).select_from(table) 

390 ) 

391 else: 

392 result.append( 

393 sql.select( 

394 *[col(name, table) for name in colnames] 

395 ).select_from(table) 

396 ) 

397 return sql.union_all(*result).alias(aliasname) 

398 

399 

400def identity_key( 

401 class_: Optional[Type[_T]] = None, 

402 ident: Union[Any, Tuple[Any, ...]] = None, 

403 *, 

404 instance: Optional[_T] = None, 

405 row: Optional[Union[Row[Any], RowMapping]] = None, 

406 identity_token: Optional[Any] = None, 

407) -> _IdentityKeyType[_T]: 

408 r"""Generate "identity key" tuples, as are used as keys in the 

409 :attr:`.Session.identity_map` dictionary. 

410 

411 This function has several call styles: 

412 

413 * ``identity_key(class, ident, identity_token=token)`` 

414 

415 This form receives a mapped class and a primary key scalar or 

416 tuple as an argument. 

417 

418 E.g.:: 

419 

420 >>> identity_key(MyClass, (1, 2)) 

421 (<class '__main__.MyClass'>, (1, 2), None) 

422 

423 :param class: mapped class (must be a positional argument) 

424 :param ident: primary key, may be a scalar or tuple argument. 

425 :param identity_token: optional identity token 

426 

427 .. versionadded:: 1.2 added identity_token 

428 

429 

430 * ``identity_key(instance=instance)`` 

431 

432 This form will produce the identity key for a given instance. The 

433 instance need not be persistent, only that its primary key attributes 

434 are populated (else the key will contain ``None`` for those missing 

435 values). 

436 

437 E.g.:: 

438 

439 >>> instance = MyClass(1, 2) 

440 >>> identity_key(instance=instance) 

441 (<class '__main__.MyClass'>, (1, 2), None) 

442 

443 In this form, the given instance is ultimately run though 

444 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

445 effect of performing a database check for the corresponding row 

446 if the object is expired. 

447 

448 :param instance: object instance (must be given as a keyword arg) 

449 

450 * ``identity_key(class, row=row, identity_token=token)`` 

451 

452 This form is similar to the class/tuple form, except is passed a 

453 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

454 

455 E.g.:: 

456 

457 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

458 >>> identity_key(MyClass, row=row) 

459 (<class '__main__.MyClass'>, (1, 2), None) 

460 

461 :param class: mapped class (must be a positional argument) 

462 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

463 (must be given as a keyword arg) 

464 :param identity_token: optional identity token 

465 

466 .. versionadded:: 1.2 added identity_token 

467 

468 """ # noqa: E501 

469 if class_ is not None: 

470 mapper = class_mapper(class_) 

471 if row is None: 

472 if ident is None: 

473 raise sa_exc.ArgumentError("ident or row is required") 

474 return mapper.identity_key_from_primary_key( 

475 tuple(util.to_list(ident)), identity_token=identity_token 

476 ) 

477 else: 

478 return mapper.identity_key_from_row( 

479 row, identity_token=identity_token 

480 ) 

481 elif instance is not None: 

482 mapper = object_mapper(instance) 

483 return mapper.identity_key_from_instance(instance) 

484 else: 

485 raise sa_exc.ArgumentError("class or instance is required") 

486 

487 

488class _TraceAdaptRole(enum.Enum): 

489 """Enumeration of all the use cases for ORMAdapter. 

490 

491 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

492 used for in-place adaption of column expressions to be applied to a SELECT, 

493 replacing :class:`.Table` and other objects that are mapped to classes with 

494 aliases of those tables in the case of joined eager loading, or in the case 

495 of polymorphic loading as used with concrete mappings or other custom "with 

496 polymorphic" parameters, with whole user-defined subqueries. The 

497 enumerations provide an overview of all the use cases used by ORMAdapter, a 

498 layer of formality as to the introduction of new ORMAdapter use cases (of 

499 which none are anticipated), as well as a means to trace the origins of a 

500 particular ORMAdapter within runtime debugging. 

501 

502 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

503 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

504 method and the ``Query.select_from_entity()`` methods, favoring 

505 user-explicit aliasing schemes using the ``aliased()`` and 

506 ``with_polymorphic()`` standalone constructs; these still use adaption, 

507 however the adaption is applied in a narrower scope. 

508 

509 """ 

510 

511 # aliased() use that is used to adapt individual attributes at query 

512 # construction time 

513 ALIASED_INSP = enum.auto() 

514 

515 # joinedload cases; typically adapt an ON clause of a relationship 

516 # join 

517 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

518 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

519 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

520 

521 # polymorphic cases - these are complex ones that replace FROM 

522 # clauses, replacing tables with subqueries 

523 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

524 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

525 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

526 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

527 

528 # the from_statement() case, used only to adapt individual attributes 

529 # from a given statement to local ORM attributes at result fetching 

530 # time. assigned to ORMCompileState._from_obj_alias 

531 ADAPT_FROM_STATEMENT = enum.auto() 

532 

533 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

534 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

535 # joinedloads are then placed on the outside. 

536 # assigned to ORMCompileState.compound_eager_adapter 

537 COMPOUND_EAGER_STATEMENT = enum.auto() 

538 

539 # the legacy Query._set_select_from() case. 

540 # this is needed for Query's set operations (i.e. UNION, etc. ) 

541 # as well as "legacy from_self()", which while removed from 2.0 as 

542 # public API, is used for the Query.count() method. this one 

543 # still does full statement traversal 

544 # assigned to ORMCompileState._from_obj_alias 

545 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

546 

547 

548class ORMStatementAdapter(sql_util.ColumnAdapter): 

549 """ColumnAdapter which includes a role attribute.""" 

550 

551 __slots__ = ("role",) 

552 

553 def __init__( 

554 self, 

555 role: _TraceAdaptRole, 

556 selectable: Selectable, 

557 *, 

558 equivalents: Optional[_EquivalentColumnMap] = None, 

559 adapt_required: bool = False, 

560 allow_label_resolve: bool = True, 

561 anonymize_labels: bool = False, 

562 adapt_on_names: bool = False, 

563 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

564 ): 

565 self.role = role 

566 super().__init__( 

567 selectable, 

568 equivalents=equivalents, 

569 adapt_required=adapt_required, 

570 allow_label_resolve=allow_label_resolve, 

571 anonymize_labels=anonymize_labels, 

572 adapt_on_names=adapt_on_names, 

573 adapt_from_selectables=adapt_from_selectables, 

574 ) 

575 

576 

577class ORMAdapter(sql_util.ColumnAdapter): 

578 """ColumnAdapter subclass which excludes adaptation of entities from 

579 non-matching mappers. 

580 

581 """ 

582 

583 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

584 

585 is_aliased_class: bool 

586 aliased_insp: Optional[AliasedInsp[Any]] 

587 

588 def __init__( 

589 self, 

590 role: _TraceAdaptRole, 

591 entity: _InternalEntityType[Any], 

592 *, 

593 equivalents: Optional[_EquivalentColumnMap] = None, 

594 adapt_required: bool = False, 

595 allow_label_resolve: bool = True, 

596 anonymize_labels: bool = False, 

597 selectable: Optional[Selectable] = None, 

598 limit_on_entity: bool = True, 

599 adapt_on_names: bool = False, 

600 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

601 ): 

602 self.role = role 

603 self.mapper = entity.mapper 

604 if selectable is None: 

605 selectable = entity.selectable 

606 if insp_is_aliased_class(entity): 

607 self.is_aliased_class = True 

608 self.aliased_insp = entity 

609 else: 

610 self.is_aliased_class = False 

611 self.aliased_insp = None 

612 

613 super().__init__( 

614 selectable, 

615 equivalents, 

616 adapt_required=adapt_required, 

617 allow_label_resolve=allow_label_resolve, 

618 anonymize_labels=anonymize_labels, 

619 include_fn=self._include_fn if limit_on_entity else None, 

620 adapt_on_names=adapt_on_names, 

621 adapt_from_selectables=adapt_from_selectables, 

622 ) 

623 

624 def _include_fn(self, elem): 

625 entity = elem._annotations.get("parentmapper", None) 

626 

627 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

628 

629 

630class AliasedClass( 

631 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

632): 

633 r"""Represents an "aliased" form of a mapped class for usage with Query. 

634 

635 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

636 construct, this object mimics the mapped class using a 

637 ``__getattr__`` scheme and maintains a reference to a 

638 real :class:`~sqlalchemy.sql.expression.Alias` object. 

639 

640 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

641 within a SQL statement generated by the ORM, such that an existing 

642 mapped entity can be used in multiple contexts. A simple example:: 

643 

644 # find all pairs of users with the same name 

645 user_alias = aliased(User) 

646 session.query(User, user_alias).join( 

647 (user_alias, User.id > user_alias.id) 

648 ).filter(User.name == user_alias.name) 

649 

650 :class:`.AliasedClass` is also capable of mapping an existing mapped 

651 class to an entirely new selectable, provided this selectable is column- 

652 compatible with the existing mapped selectable, and it can also be 

653 configured in a mapping as the target of a :func:`_orm.relationship`. 

654 See the links below for examples. 

655 

656 The :class:`.AliasedClass` object is constructed typically using the 

657 :func:`_orm.aliased` function. It also is produced with additional 

658 configuration when using the :func:`_orm.with_polymorphic` function. 

659 

660 The resulting object is an instance of :class:`.AliasedClass`. 

661 This object implements an attribute scheme which produces the 

662 same attribute and method interface as the original mapped 

663 class, allowing :class:`.AliasedClass` to be compatible 

664 with any attribute technique which works on the original class, 

665 including hybrid attributes (see :ref:`hybrids_toplevel`). 

666 

667 The :class:`.AliasedClass` can be inspected for its underlying 

668 :class:`_orm.Mapper`, aliased selectable, and other information 

669 using :func:`_sa.inspect`:: 

670 

671 from sqlalchemy import inspect 

672 

673 my_alias = aliased(MyClass) 

674 insp = inspect(my_alias) 

675 

676 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

677 

678 

679 .. seealso:: 

680 

681 :func:`.aliased` 

682 

683 :func:`.with_polymorphic` 

684 

685 :ref:`relationship_aliased_class` 

686 

687 :ref:`relationship_to_window_function` 

688 

689 

690 """ 

691 

692 __name__: str 

693 

694 def __init__( 

695 self, 

696 mapped_class_or_ac: _EntityType[_O], 

697 alias: Optional[FromClause] = None, 

698 name: Optional[str] = None, 

699 flat: bool = False, 

700 adapt_on_names: bool = False, 

701 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

702 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

703 base_alias: Optional[AliasedInsp[Any]] = None, 

704 use_mapper_path: bool = False, 

705 represents_outer_join: bool = False, 

706 ): 

707 insp = cast( 

708 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

709 ) 

710 mapper = insp.mapper 

711 

712 nest_adapters = False 

713 

714 if alias is None: 

715 if insp.is_aliased_class and insp.selectable._is_subquery: 

716 alias = insp.selectable.alias() 

717 else: 

718 alias = ( 

719 mapper._with_polymorphic_selectable._anonymous_fromclause( 

720 name=name, 

721 flat=flat, 

722 ) 

723 ) 

724 elif insp.is_aliased_class: 

725 nest_adapters = True 

726 

727 assert alias is not None 

728 self._aliased_insp = AliasedInsp( 

729 self, 

730 insp, 

731 alias, 

732 name, 

733 ( 

734 with_polymorphic_mappers 

735 if with_polymorphic_mappers 

736 else mapper.with_polymorphic_mappers 

737 ), 

738 ( 

739 with_polymorphic_discriminator 

740 if with_polymorphic_discriminator is not None 

741 else mapper.polymorphic_on 

742 ), 

743 base_alias, 

744 use_mapper_path, 

745 adapt_on_names, 

746 represents_outer_join, 

747 nest_adapters, 

748 ) 

749 

750 self.__name__ = f"aliased({mapper.class_.__name__})" 

751 

752 @classmethod 

753 def _reconstitute_from_aliased_insp( 

754 cls, aliased_insp: AliasedInsp[_O] 

755 ) -> AliasedClass[_O]: 

756 obj = cls.__new__(cls) 

757 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

758 obj._aliased_insp = aliased_insp 

759 

760 if aliased_insp._is_with_polymorphic: 

761 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

762 if sub_aliased_insp is not aliased_insp: 

763 ent = AliasedClass._reconstitute_from_aliased_insp( 

764 sub_aliased_insp 

765 ) 

766 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

767 

768 return obj 

769 

770 def __getattr__(self, key: str) -> Any: 

771 try: 

772 _aliased_insp = self.__dict__["_aliased_insp"] 

773 except KeyError: 

774 raise AttributeError() 

775 else: 

776 target = _aliased_insp._target 

777 # maintain all getattr mechanics 

778 attr = getattr(target, key) 

779 

780 # attribute is a method, that will be invoked against a 

781 # "self"; so just return a new method with the same function and 

782 # new self 

783 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

784 return types.MethodType(attr.__func__, self) 

785 

786 # attribute is a descriptor, that will be invoked against a 

787 # "self"; so invoke the descriptor against this self 

788 if hasattr(attr, "__get__"): 

789 attr = attr.__get__(None, self) 

790 

791 # attributes within the QueryableAttribute system will want this 

792 # to be invoked so the object can be adapted 

793 if hasattr(attr, "adapt_to_entity"): 

794 attr = attr.adapt_to_entity(_aliased_insp) 

795 setattr(self, key, attr) 

796 

797 return attr 

798 

799 def _get_from_serialized( 

800 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

801 ) -> Any: 

802 # this method is only used in terms of the 

803 # sqlalchemy.ext.serializer extension 

804 attr = getattr(mapped_class, key) 

805 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

806 return types.MethodType(attr.__func__, self) 

807 

808 # attribute is a descriptor, that will be invoked against a 

809 # "self"; so invoke the descriptor against this self 

810 if hasattr(attr, "__get__"): 

811 attr = attr.__get__(None, self) 

812 

813 # attributes within the QueryableAttribute system will want this 

814 # to be invoked so the object can be adapted 

815 if hasattr(attr, "adapt_to_entity"): 

816 aliased_insp._weak_entity = weakref.ref(self) 

817 attr = attr.adapt_to_entity(aliased_insp) 

818 setattr(self, key, attr) 

819 

820 return attr 

821 

822 def __repr__(self) -> str: 

823 return "<AliasedClass at 0x%x; %s>" % ( 

824 id(self), 

825 self._aliased_insp._target.__name__, 

826 ) 

827 

828 def __str__(self) -> str: 

829 return str(self._aliased_insp) 

830 

831 

832@inspection._self_inspects 

833class AliasedInsp( 

834 ORMEntityColumnsClauseRole[_O], 

835 ORMFromClauseRole, 

836 HasCacheKey, 

837 InspectionAttr, 

838 MemoizedSlots, 

839 inspection.Inspectable["AliasedInsp[_O]"], 

840 Generic[_O], 

841): 

842 """Provide an inspection interface for an 

843 :class:`.AliasedClass` object. 

844 

845 The :class:`.AliasedInsp` object is returned 

846 given an :class:`.AliasedClass` using the 

847 :func:`_sa.inspect` function:: 

848 

849 from sqlalchemy import inspect 

850 from sqlalchemy.orm import aliased 

851 

852 my_alias = aliased(MyMappedClass) 

853 insp = inspect(my_alias) 

854 

855 Attributes on :class:`.AliasedInsp` 

856 include: 

857 

858 * ``entity`` - the :class:`.AliasedClass` represented. 

859 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

860 * ``selectable`` - the :class:`_expression.Alias` 

861 construct which ultimately 

862 represents an aliased :class:`_schema.Table` or 

863 :class:`_expression.Select` 

864 construct. 

865 * ``name`` - the name of the alias. Also is used as the attribute 

866 name when returned in a result tuple from :class:`_query.Query`. 

867 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

868 objects 

869 indicating all those mappers expressed in the select construct 

870 for the :class:`.AliasedClass`. 

871 * ``polymorphic_on`` - an alternate column or SQL expression which 

872 will be used as the "discriminator" for a polymorphic load. 

873 

874 .. seealso:: 

875 

876 :ref:`inspection_toplevel` 

877 

878 """ 

879 

880 __slots__ = ( 

881 "__weakref__", 

882 "_weak_entity", 

883 "mapper", 

884 "selectable", 

885 "name", 

886 "_adapt_on_names", 

887 "with_polymorphic_mappers", 

888 "polymorphic_on", 

889 "_use_mapper_path", 

890 "_base_alias", 

891 "represents_outer_join", 

892 "persist_selectable", 

893 "local_table", 

894 "_is_with_polymorphic", 

895 "_with_polymorphic_entities", 

896 "_adapter", 

897 "_target", 

898 "__clause_element__", 

899 "_memoized_values", 

900 "_all_column_expressions", 

901 "_nest_adapters", 

902 ) 

903 

904 _cache_key_traversal = [ 

905 ("name", visitors.ExtendedInternalTraversal.dp_string), 

906 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

907 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

908 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

909 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

910 ( 

911 "with_polymorphic_mappers", 

912 visitors.InternalTraversal.dp_has_cache_key_list, 

913 ), 

914 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

915 ] 

916 

917 mapper: Mapper[_O] 

918 selectable: FromClause 

919 _adapter: ORMAdapter 

920 with_polymorphic_mappers: Sequence[Mapper[Any]] 

921 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

922 

923 _weak_entity: weakref.ref[AliasedClass[_O]] 

924 """the AliasedClass that refers to this AliasedInsp""" 

925 

926 _target: Union[Type[_O], AliasedClass[_O]] 

927 """the thing referenced by the AliasedClass/AliasedInsp. 

928 

929 In the vast majority of cases, this is the mapped class. However 

930 it may also be another AliasedClass (alias of alias). 

931 

932 """ 

933 

934 def __init__( 

935 self, 

936 entity: AliasedClass[_O], 

937 inspected: _InternalEntityType[_O], 

938 selectable: FromClause, 

939 name: Optional[str], 

940 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

941 polymorphic_on: Optional[ColumnElement[Any]], 

942 _base_alias: Optional[AliasedInsp[Any]], 

943 _use_mapper_path: bool, 

944 adapt_on_names: bool, 

945 represents_outer_join: bool, 

946 nest_adapters: bool, 

947 ): 

948 mapped_class_or_ac = inspected.entity 

949 mapper = inspected.mapper 

950 

951 self._weak_entity = weakref.ref(entity) 

952 self.mapper = mapper 

953 self.selectable = self.persist_selectable = self.local_table = ( 

954 selectable 

955 ) 

956 self.name = name 

957 self.polymorphic_on = polymorphic_on 

958 self._base_alias = weakref.ref(_base_alias or self) 

959 self._use_mapper_path = _use_mapper_path 

960 self.represents_outer_join = represents_outer_join 

961 self._nest_adapters = nest_adapters 

962 

963 if with_polymorphic_mappers: 

964 self._is_with_polymorphic = True 

965 self.with_polymorphic_mappers = with_polymorphic_mappers 

966 self._with_polymorphic_entities = [] 

967 for poly in self.with_polymorphic_mappers: 

968 if poly is not mapper: 

969 ent = AliasedClass( 

970 poly.class_, 

971 selectable, 

972 base_alias=self, 

973 adapt_on_names=adapt_on_names, 

974 use_mapper_path=_use_mapper_path, 

975 ) 

976 

977 setattr(self.entity, poly.class_.__name__, ent) 

978 self._with_polymorphic_entities.append(ent._aliased_insp) 

979 

980 else: 

981 self._is_with_polymorphic = False 

982 self.with_polymorphic_mappers = [mapper] 

983 

984 self._adapter = ORMAdapter( 

985 _TraceAdaptRole.ALIASED_INSP, 

986 mapper, 

987 selectable=selectable, 

988 equivalents=mapper._equivalent_columns, 

989 adapt_on_names=adapt_on_names, 

990 anonymize_labels=True, 

991 # make sure the adapter doesn't try to grab other tables that 

992 # are not even the thing we are mapping, such as embedded 

993 # selectables in subqueries or CTEs. See issue #6060 

994 adapt_from_selectables={ 

995 m.selectable 

996 for m in self.with_polymorphic_mappers 

997 if not adapt_on_names 

998 }, 

999 limit_on_entity=False, 

1000 ) 

1001 

1002 if nest_adapters: 

1003 # supports "aliased class of aliased class" use case 

1004 assert isinstance(inspected, AliasedInsp) 

1005 self._adapter = inspected._adapter.wrap(self._adapter) 

1006 

1007 self._adapt_on_names = adapt_on_names 

1008 self._target = mapped_class_or_ac 

1009 

1010 @classmethod 

1011 def _alias_factory( 

1012 cls, 

1013 element: Union[_EntityType[_O], FromClause], 

1014 alias: Optional[FromClause] = None, 

1015 name: Optional[str] = None, 

1016 flat: bool = False, 

1017 adapt_on_names: bool = False, 

1018 ) -> Union[AliasedClass[_O], FromClause]: 

1019 if isinstance(element, FromClause): 

1020 if adapt_on_names: 

1021 raise sa_exc.ArgumentError( 

1022 "adapt_on_names only applies to ORM elements" 

1023 ) 

1024 if name: 

1025 return element.alias(name=name, flat=flat) 

1026 else: 

1027 return coercions.expect( 

1028 roles.AnonymizedFromClauseRole, element, flat=flat 

1029 ) 

1030 else: 

1031 return AliasedClass( 

1032 element, 

1033 alias=alias, 

1034 flat=flat, 

1035 name=name, 

1036 adapt_on_names=adapt_on_names, 

1037 ) 

1038 

1039 @classmethod 

1040 def _with_polymorphic_factory( 

1041 cls, 

1042 base: Union[Type[_O], Mapper[_O]], 

1043 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1044 selectable: Union[Literal[False, None], FromClause] = False, 

1045 flat: bool = False, 

1046 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1047 aliased: bool = False, 

1048 innerjoin: bool = False, 

1049 adapt_on_names: bool = False, 

1050 name: Optional[str] = None, 

1051 _use_mapper_path: bool = False, 

1052 ) -> AliasedClass[_O]: 

1053 primary_mapper = _class_to_mapper(base) 

1054 

1055 if selectable not in (None, False) and flat: 

1056 raise sa_exc.ArgumentError( 

1057 "the 'flat' and 'selectable' arguments cannot be passed " 

1058 "simultaneously to with_polymorphic()" 

1059 ) 

1060 

1061 mappers, selectable = primary_mapper._with_polymorphic_args( 

1062 classes, selectable, innerjoin=innerjoin 

1063 ) 

1064 if aliased or flat: 

1065 assert selectable is not None 

1066 selectable = selectable._anonymous_fromclause(flat=flat) 

1067 

1068 return AliasedClass( 

1069 base, 

1070 selectable, 

1071 name=name, 

1072 with_polymorphic_mappers=mappers, 

1073 adapt_on_names=adapt_on_names, 

1074 with_polymorphic_discriminator=polymorphic_on, 

1075 use_mapper_path=_use_mapper_path, 

1076 represents_outer_join=not innerjoin, 

1077 ) 

1078 

1079 @property 

1080 def entity(self) -> AliasedClass[_O]: 

1081 # to eliminate reference cycles, the AliasedClass is held weakly. 

1082 # this produces some situations where the AliasedClass gets lost, 

1083 # particularly when one is created internally and only the AliasedInsp 

1084 # is passed around. 

1085 # to work around this case, we just generate a new one when we need 

1086 # it, as it is a simple class with very little initial state on it. 

1087 ent = self._weak_entity() 

1088 if ent is None: 

1089 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1090 self._weak_entity = weakref.ref(ent) 

1091 return ent 

1092 

1093 is_aliased_class = True 

1094 "always returns True" 

1095 

1096 def _memoized_method___clause_element__(self) -> FromClause: 

1097 return self.selectable._annotate( 

1098 { 

1099 "parentmapper": self.mapper, 

1100 "parententity": self, 

1101 "entity_namespace": self, 

1102 } 

1103 )._set_propagate_attrs( 

1104 {"compile_state_plugin": "orm", "plugin_subject": self} 

1105 ) 

1106 

1107 @property 

1108 def entity_namespace(self) -> AliasedClass[_O]: 

1109 return self.entity 

1110 

1111 @property 

1112 def class_(self) -> Type[_O]: 

1113 """Return the mapped class ultimately represented by this 

1114 :class:`.AliasedInsp`.""" 

1115 return self.mapper.class_ 

1116 

1117 @property 

1118 def _path_registry(self) -> AbstractEntityRegistry: 

1119 if self._use_mapper_path: 

1120 return self.mapper._path_registry 

1121 else: 

1122 return PathRegistry.per_mapper(self) 

1123 

1124 def __getstate__(self) -> Dict[str, Any]: 

1125 return { 

1126 "entity": self.entity, 

1127 "mapper": self.mapper, 

1128 "alias": self.selectable, 

1129 "name": self.name, 

1130 "adapt_on_names": self._adapt_on_names, 

1131 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1132 "with_polymorphic_discriminator": self.polymorphic_on, 

1133 "base_alias": self._base_alias(), 

1134 "use_mapper_path": self._use_mapper_path, 

1135 "represents_outer_join": self.represents_outer_join, 

1136 "nest_adapters": self._nest_adapters, 

1137 } 

1138 

1139 def __setstate__(self, state: Dict[str, Any]) -> None: 

1140 self.__init__( # type: ignore 

1141 state["entity"], 

1142 state["mapper"], 

1143 state["alias"], 

1144 state["name"], 

1145 state["with_polymorphic_mappers"], 

1146 state["with_polymorphic_discriminator"], 

1147 state["base_alias"], 

1148 state["use_mapper_path"], 

1149 state["adapt_on_names"], 

1150 state["represents_outer_join"], 

1151 state["nest_adapters"], 

1152 ) 

1153 

1154 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1155 # assert self._is_with_polymorphic 

1156 # assert other._is_with_polymorphic 

1157 

1158 primary_mapper = other.mapper 

1159 

1160 assert self.mapper is primary_mapper 

1161 

1162 our_classes = util.to_set( 

1163 mp.class_ for mp in self.with_polymorphic_mappers 

1164 ) 

1165 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1166 if our_classes == new_classes: 

1167 return other 

1168 else: 

1169 classes = our_classes.union(new_classes) 

1170 

1171 mappers, selectable = primary_mapper._with_polymorphic_args( 

1172 classes, None, innerjoin=not other.represents_outer_join 

1173 ) 

1174 selectable = selectable._anonymous_fromclause(flat=True) 

1175 return AliasedClass( 

1176 primary_mapper, 

1177 selectable, 

1178 with_polymorphic_mappers=mappers, 

1179 with_polymorphic_discriminator=other.polymorphic_on, 

1180 use_mapper_path=other._use_mapper_path, 

1181 represents_outer_join=other.represents_outer_join, 

1182 )._aliased_insp 

1183 

1184 def _adapt_element( 

1185 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1186 ) -> _ORMCOLEXPR: 

1187 assert isinstance(expr, ColumnElement) 

1188 d: Dict[str, Any] = { 

1189 "parententity": self, 

1190 "parentmapper": self.mapper, 

1191 } 

1192 if key: 

1193 d["proxy_key"] = key 

1194 

1195 # IMO mypy should see this one also as returning the same type 

1196 # we put into it, but it's not 

1197 return ( 

1198 self._adapter.traverse(expr) 

1199 ._annotate(d) 

1200 ._set_propagate_attrs( 

1201 {"compile_state_plugin": "orm", "plugin_subject": self} 

1202 ) 

1203 ) 

1204 

1205 if TYPE_CHECKING: 

1206 # establish compatibility with the _ORMAdapterProto protocol, 

1207 # which in turn is compatible with _CoreAdapterProto. 

1208 

1209 def _orm_adapt_element( 

1210 self, 

1211 obj: _CE, 

1212 key: Optional[str] = None, 

1213 ) -> _CE: ... 

1214 

1215 else: 

1216 _orm_adapt_element = _adapt_element 

1217 

1218 def _entity_for_mapper(self, mapper): 

1219 self_poly = self.with_polymorphic_mappers 

1220 if mapper in self_poly: 

1221 if mapper is self.mapper: 

1222 return self 

1223 else: 

1224 return getattr( 

1225 self.entity, mapper.class_.__name__ 

1226 )._aliased_insp 

1227 elif mapper.isa(self.mapper): 

1228 return self 

1229 else: 

1230 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1231 

1232 def _memoized_attr__get_clause(self): 

1233 onclause, replacemap = self.mapper._get_clause 

1234 return ( 

1235 self._adapter.traverse(onclause), 

1236 { 

1237 self._adapter.traverse(col): param 

1238 for col, param in replacemap.items() 

1239 }, 

1240 ) 

1241 

1242 def _memoized_attr__memoized_values(self): 

1243 return {} 

1244 

1245 def _memoized_attr__all_column_expressions(self): 

1246 if self._is_with_polymorphic: 

1247 cols_plus_keys = self.mapper._columns_plus_keys( 

1248 [ent.mapper for ent in self._with_polymorphic_entities] 

1249 ) 

1250 else: 

1251 cols_plus_keys = self.mapper._columns_plus_keys() 

1252 

1253 cols_plus_keys = [ 

1254 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1255 ] 

1256 

1257 return ColumnCollection(cols_plus_keys) 

1258 

1259 def _memo(self, key, callable_, *args, **kw): 

1260 if key in self._memoized_values: 

1261 return self._memoized_values[key] 

1262 else: 

1263 self._memoized_values[key] = value = callable_(*args, **kw) 

1264 return value 

1265 

1266 def __repr__(self): 

1267 if self.with_polymorphic_mappers: 

1268 with_poly = "(%s)" % ", ".join( 

1269 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1270 ) 

1271 else: 

1272 with_poly = "" 

1273 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1274 id(self), 

1275 self.class_.__name__, 

1276 with_poly, 

1277 ) 

1278 

1279 def __str__(self): 

1280 if self._is_with_polymorphic: 

1281 return "with_polymorphic(%s, [%s])" % ( 

1282 self._target.__name__, 

1283 ", ".join( 

1284 mp.class_.__name__ 

1285 for mp in self.with_polymorphic_mappers 

1286 if mp is not self.mapper 

1287 ), 

1288 ) 

1289 else: 

1290 return "aliased(%s)" % (self._target.__name__,) 

1291 

1292 

1293class _WrapUserEntity: 

1294 """A wrapper used within the loader_criteria lambda caller so that 

1295 we can bypass declared_attr descriptors on unmapped mixins, which 

1296 normally emit a warning for such use. 

1297 

1298 might also be useful for other per-lambda instrumentations should 

1299 the need arise. 

1300 

1301 """ 

1302 

1303 __slots__ = ("subject",) 

1304 

1305 def __init__(self, subject): 

1306 self.subject = subject 

1307 

1308 @util.preload_module("sqlalchemy.orm.decl_api") 

1309 def __getattribute__(self, name): 

1310 decl_api = util.preloaded.orm.decl_api 

1311 

1312 subject = object.__getattribute__(self, "subject") 

1313 if name in subject.__dict__ and isinstance( 

1314 subject.__dict__[name], decl_api.declared_attr 

1315 ): 

1316 return subject.__dict__[name].fget(subject) 

1317 else: 

1318 return getattr(subject, name) 

1319 

1320 

1321class LoaderCriteriaOption(CriteriaOption): 

1322 """Add additional WHERE criteria to the load for all occurrences of 

1323 a particular entity. 

1324 

1325 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1326 :func:`_orm.with_loader_criteria` function; see that function for 

1327 details. 

1328 

1329 .. versionadded:: 1.4 

1330 

1331 """ 

1332 

1333 __slots__ = ( 

1334 "root_entity", 

1335 "entity", 

1336 "deferred_where_criteria", 

1337 "where_criteria", 

1338 "_where_crit_orig", 

1339 "include_aliases", 

1340 "propagate_to_loaders", 

1341 ) 

1342 

1343 _traverse_internals = [ 

1344 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1345 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1346 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1347 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1348 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1349 ] 

1350 

1351 root_entity: Optional[Type[Any]] 

1352 entity: Optional[_InternalEntityType[Any]] 

1353 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1354 deferred_where_criteria: bool 

1355 include_aliases: bool 

1356 propagate_to_loaders: bool 

1357 

1358 _where_crit_orig: Any 

1359 

1360 def __init__( 

1361 self, 

1362 entity_or_base: _EntityType[Any], 

1363 where_criteria: Union[ 

1364 _ColumnExpressionArgument[bool], 

1365 Callable[[Any], _ColumnExpressionArgument[bool]], 

1366 ], 

1367 loader_only: bool = False, 

1368 include_aliases: bool = False, 

1369 propagate_to_loaders: bool = True, 

1370 track_closure_variables: bool = True, 

1371 ): 

1372 entity = cast( 

1373 "_InternalEntityType[Any]", 

1374 inspection.inspect(entity_or_base, False), 

1375 ) 

1376 if entity is None: 

1377 self.root_entity = cast("Type[Any]", entity_or_base) 

1378 self.entity = None 

1379 else: 

1380 self.root_entity = None 

1381 self.entity = entity 

1382 

1383 self._where_crit_orig = where_criteria 

1384 if callable(where_criteria): 

1385 if self.root_entity is not None: 

1386 wrap_entity = self.root_entity 

1387 else: 

1388 assert entity is not None 

1389 wrap_entity = entity.entity 

1390 

1391 self.deferred_where_criteria = True 

1392 self.where_criteria = lambdas.DeferredLambdaElement( 

1393 where_criteria, 

1394 roles.WhereHavingRole, 

1395 lambda_args=(_WrapUserEntity(wrap_entity),), 

1396 opts=lambdas.LambdaOptions( 

1397 track_closure_variables=track_closure_variables 

1398 ), 

1399 ) 

1400 else: 

1401 self.deferred_where_criteria = False 

1402 self.where_criteria = coercions.expect( 

1403 roles.WhereHavingRole, where_criteria 

1404 ) 

1405 

1406 self.include_aliases = include_aliases 

1407 self.propagate_to_loaders = propagate_to_loaders 

1408 

1409 @classmethod 

1410 def _unreduce( 

1411 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1412 ): 

1413 return LoaderCriteriaOption( 

1414 entity, 

1415 where_criteria, 

1416 include_aliases=include_aliases, 

1417 propagate_to_loaders=propagate_to_loaders, 

1418 ) 

1419 

1420 def __reduce__(self): 

1421 return ( 

1422 LoaderCriteriaOption._unreduce, 

1423 ( 

1424 self.entity.class_ if self.entity else self.root_entity, 

1425 self._where_crit_orig, 

1426 self.include_aliases, 

1427 self.propagate_to_loaders, 

1428 ), 

1429 ) 

1430 

1431 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1432 if self.entity: 

1433 yield from self.entity.mapper.self_and_descendants 

1434 else: 

1435 assert self.root_entity 

1436 stack = list(self.root_entity.__subclasses__()) 

1437 while stack: 

1438 subclass = stack.pop(0) 

1439 ent = cast( 

1440 "_InternalEntityType[Any]", 

1441 inspection.inspect(subclass, raiseerr=False), 

1442 ) 

1443 if ent: 

1444 yield from ent.mapper.self_and_descendants 

1445 else: 

1446 stack.extend(subclass.__subclasses__()) 

1447 

1448 def _should_include(self, compile_state: ORMCompileState) -> bool: 

1449 if ( 

1450 compile_state.select_statement._annotations.get( 

1451 "for_loader_criteria", None 

1452 ) 

1453 is self 

1454 ): 

1455 return False 

1456 return True 

1457 

1458 def _resolve_where_criteria( 

1459 self, ext_info: _InternalEntityType[Any] 

1460 ) -> ColumnElement[bool]: 

1461 if self.deferred_where_criteria: 

1462 crit = cast( 

1463 "ColumnElement[bool]", 

1464 self.where_criteria._resolve_with_args(ext_info.entity), 

1465 ) 

1466 else: 

1467 crit = self.where_criteria # type: ignore 

1468 assert isinstance(crit, ColumnElement) 

1469 return sql_util._deep_annotate( 

1470 crit, 

1471 {"for_loader_criteria": self}, 

1472 detect_subquery_cols=True, 

1473 ind_cols_on_fromclause=True, 

1474 ) 

1475 

1476 def process_compile_state_replaced_entities( 

1477 self, 

1478 compile_state: ORMCompileState, 

1479 mapper_entities: Iterable[_MapperEntity], 

1480 ) -> None: 

1481 self.process_compile_state(compile_state) 

1482 

1483 def process_compile_state(self, compile_state: ORMCompileState) -> None: 

1484 """Apply a modification to a given :class:`.CompileState`.""" 

1485 

1486 # if options to limit the criteria to immediate query only, 

1487 # use compile_state.attributes instead 

1488 

1489 self.get_global_criteria(compile_state.global_attributes) 

1490 

1491 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1492 for mp in self._all_mappers(): 

1493 load_criteria = attributes.setdefault( 

1494 ("additional_entity_criteria", mp), [] 

1495 ) 

1496 

1497 load_criteria.append(self) 

1498 

1499 

1500inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1501 

1502 

1503@inspection._inspects(type) 

1504def _inspect_mc( 

1505 class_: Type[_O], 

1506) -> Optional[Mapper[_O]]: 

1507 try: 

1508 class_manager = opt_manager_of_class(class_) 

1509 if class_manager is None or not class_manager.is_mapped: 

1510 return None 

1511 mapper = class_manager.mapper 

1512 except exc.NO_STATE: 

1513 return None 

1514 else: 

1515 return mapper 

1516 

1517 

1518GenericAlias = type(List[Any]) 

1519 

1520 

1521@inspection._inspects(GenericAlias) 

1522def _inspect_generic_alias( 

1523 class_: Type[_O], 

1524) -> Optional[Mapper[_O]]: 

1525 origin = cast("Type[_O]", get_origin(class_)) 

1526 return _inspect_mc(origin) 

1527 

1528 

1529@inspection._self_inspects 

1530class Bundle( 

1531 ORMColumnsClauseRole[_T], 

1532 SupportsCloneAnnotations, 

1533 MemoizedHasCacheKey, 

1534 inspection.Inspectable["Bundle[_T]"], 

1535 InspectionAttr, 

1536): 

1537 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1538 under one namespace. 

1539 

1540 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1541 results returned by a column-oriented :class:`_query.Query` object. 

1542 It also 

1543 is extensible via simple subclassing, where the primary capability 

1544 to override is that of how the set of expressions should be returned, 

1545 allowing post-processing as well as custom return types, without 

1546 involving ORM identity-mapped classes. 

1547 

1548 .. seealso:: 

1549 

1550 :ref:`bundles` 

1551 

1552 

1553 """ 

1554 

1555 single_entity = False 

1556 """If True, queries for a single Bundle will be returned as a single 

1557 entity, rather than an element within a keyed tuple.""" 

1558 

1559 is_clause_element = False 

1560 

1561 is_mapper = False 

1562 

1563 is_aliased_class = False 

1564 

1565 is_bundle = True 

1566 

1567 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1568 

1569 proxy_set = util.EMPTY_SET # type: ignore 

1570 

1571 exprs: List[_ColumnsClauseElement] 

1572 

1573 def __init__( 

1574 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1575 ): 

1576 r"""Construct a new :class:`.Bundle`. 

1577 

1578 e.g.:: 

1579 

1580 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1581 

1582 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1583 print(row.mybundle.x, row.mybundle.y) 

1584 

1585 :param name: name of the bundle. 

1586 :param \*exprs: columns or SQL expressions comprising the bundle. 

1587 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1588 can be returned as a "single entity" outside of any enclosing tuple 

1589 in the same manner as a mapped entity. 

1590 

1591 """ # noqa: E501 

1592 self.name = self._label = name 

1593 coerced_exprs = [ 

1594 coercions.expect( 

1595 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1596 ) 

1597 for expr in exprs 

1598 ] 

1599 self.exprs = coerced_exprs 

1600 

1601 self.c = self.columns = ColumnCollection( 

1602 (getattr(col, "key", col._label), col) 

1603 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1604 ).as_readonly() 

1605 self.single_entity = kw.pop("single_entity", self.single_entity) 

1606 

1607 def _gen_cache_key( 

1608 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1609 ) -> Tuple[Any, ...]: 

1610 return (self.__class__, self.name, self.single_entity) + tuple( 

1611 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1612 ) 

1613 

1614 @property 

1615 def mapper(self) -> Optional[Mapper[Any]]: 

1616 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1617 "parentmapper", None 

1618 ) 

1619 return mp 

1620 

1621 @property 

1622 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1623 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1624 0 

1625 ]._annotations.get("parententity", None) 

1626 return ie 

1627 

1628 @property 

1629 def entity_namespace( 

1630 self, 

1631 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1632 return self.c 

1633 

1634 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1635 

1636 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1637 

1638 e.g.:: 

1639 

1640 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1641 

1642 q = sess.query(bn).filter(bn.c.x == 5) 

1643 

1644 Nesting of bundles is also supported:: 

1645 

1646 b1 = Bundle( 

1647 "b1", 

1648 Bundle("b2", MyClass.a, MyClass.b), 

1649 Bundle("b3", MyClass.x, MyClass.y), 

1650 ) 

1651 

1652 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1653 

1654 .. seealso:: 

1655 

1656 :attr:`.Bundle.c` 

1657 

1658 """ # noqa: E501 

1659 

1660 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1661 """An alias for :attr:`.Bundle.columns`.""" 

1662 

1663 def _clone(self, **kw): 

1664 cloned = self.__class__.__new__(self.__class__) 

1665 cloned.__dict__.update(self.__dict__) 

1666 return cloned 

1667 

1668 def __clause_element__(self): 

1669 # ensure existing entity_namespace remains 

1670 annotations = {"bundle": self, "entity_namespace": self} 

1671 annotations.update(self._annotations) 

1672 

1673 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1674 "plugin_subject", self.entity 

1675 ) 

1676 return ( 

1677 expression.ClauseList( 

1678 _literal_as_text_role=roles.ColumnsClauseRole, 

1679 group=False, 

1680 *[e._annotations.get("bundle", e) for e in self.exprs], 

1681 ) 

1682 ._annotate(annotations) 

1683 ._set_propagate_attrs( 

1684 # the Bundle *must* use the orm plugin no matter what. the 

1685 # subject can be None but it's much better if it's not. 

1686 { 

1687 "compile_state_plugin": "orm", 

1688 "plugin_subject": plugin_subject, 

1689 } 

1690 ) 

1691 ) 

1692 

1693 @property 

1694 def clauses(self): 

1695 return self.__clause_element__().clauses 

1696 

1697 def label(self, name): 

1698 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1699 

1700 cloned = self._clone() 

1701 cloned.name = name 

1702 return cloned 

1703 

1704 def create_row_processor( 

1705 self, 

1706 query: Select[Any], 

1707 procs: Sequence[Callable[[Row[Any]], Any]], 

1708 labels: Sequence[str], 

1709 ) -> Callable[[Row[Any]], Any]: 

1710 """Produce the "row processing" function for this :class:`.Bundle`. 

1711 

1712 May be overridden by subclasses to provide custom behaviors when 

1713 results are fetched. The method is passed the statement object and a 

1714 set of "row processor" functions at query execution time; these 

1715 processor functions when given a result row will return the individual 

1716 attribute value, which can then be adapted into any kind of return data 

1717 structure. 

1718 

1719 The example below illustrates replacing the usual :class:`.Row` 

1720 return structure with a straight Python dictionary:: 

1721 

1722 from sqlalchemy.orm import Bundle 

1723 

1724 

1725 class DictBundle(Bundle): 

1726 def create_row_processor(self, query, procs, labels): 

1727 "Override create_row_processor to return values as dictionaries" 

1728 

1729 def proc(row): 

1730 return dict(zip(labels, (proc(row) for proc in procs))) 

1731 

1732 return proc 

1733 

1734 A result from the above :class:`_orm.Bundle` will return dictionary 

1735 values:: 

1736 

1737 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1738 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1739 print(row.mybundle["data1"], row.mybundle["data2"]) 

1740 

1741 """ # noqa: E501 

1742 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1743 

1744 def proc(row: Row[Any]) -> Any: 

1745 return keyed_tuple([proc(row) for proc in procs]) 

1746 

1747 return proc 

1748 

1749 

1750def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: 

1751 """Deep copy the given ClauseElement, annotating each element with the 

1752 "_orm_adapt" flag. 

1753 

1754 Elements within the exclude collection will be cloned but not annotated. 

1755 

1756 """ 

1757 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) 

1758 

1759 

1760def _orm_deannotate(element: _SA) -> _SA: 

1761 """Remove annotations that link a column to a particular mapping. 

1762 

1763 Note this doesn't affect "remote" and "foreign" annotations 

1764 passed by the :func:`_orm.foreign` and :func:`_orm.remote` 

1765 annotators. 

1766 

1767 """ 

1768 

1769 return sql_util._deep_deannotate( 

1770 element, values=("_orm_adapt", "parententity") 

1771 ) 

1772 

1773 

1774def _orm_full_deannotate(element: _SA) -> _SA: 

1775 return sql_util._deep_deannotate(element) 

1776 

1777 

1778class _ORMJoin(expression.Join): 

1779 """Extend Join to support ORM constructs as input.""" 

1780 

1781 __visit_name__ = expression.Join.__visit_name__ 

1782 

1783 inherit_cache = True 

1784 

1785 def __init__( 

1786 self, 

1787 left: _FromClauseArgument, 

1788 right: _FromClauseArgument, 

1789 onclause: Optional[_OnClauseArgument] = None, 

1790 isouter: bool = False, 

1791 full: bool = False, 

1792 _left_memo: Optional[Any] = None, 

1793 _right_memo: Optional[Any] = None, 

1794 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1795 ): 

1796 left_info = cast( 

1797 "Union[FromClause, _InternalEntityType[Any]]", 

1798 inspection.inspect(left), 

1799 ) 

1800 

1801 right_info = cast( 

1802 "Union[FromClause, _InternalEntityType[Any]]", 

1803 inspection.inspect(right), 

1804 ) 

1805 adapt_to = right_info.selectable 

1806 

1807 # used by joined eager loader 

1808 self._left_memo = _left_memo 

1809 self._right_memo = _right_memo 

1810 

1811 if isinstance(onclause, attributes.QueryableAttribute): 

1812 if TYPE_CHECKING: 

1813 assert isinstance( 

1814 onclause.comparator, RelationshipProperty.Comparator 

1815 ) 

1816 on_selectable = onclause.comparator._source_selectable() 

1817 prop = onclause.property 

1818 _extra_criteria += onclause._extra_criteria 

1819 elif isinstance(onclause, MapperProperty): 

1820 # used internally by joined eager loader...possibly not ideal 

1821 prop = onclause 

1822 on_selectable = prop.parent.selectable 

1823 else: 

1824 prop = None 

1825 on_selectable = None 

1826 

1827 left_selectable = left_info.selectable 

1828 if prop: 

1829 adapt_from: Optional[FromClause] 

1830 if sql_util.clause_is_present(on_selectable, left_selectable): 

1831 adapt_from = on_selectable 

1832 else: 

1833 assert isinstance(left_selectable, FromClause) 

1834 adapt_from = left_selectable 

1835 

1836 ( 

1837 pj, 

1838 sj, 

1839 source, 

1840 dest, 

1841 secondary, 

1842 target_adapter, 

1843 ) = prop._create_joins( 

1844 source_selectable=adapt_from, 

1845 dest_selectable=adapt_to, 

1846 source_polymorphic=True, 

1847 of_type_entity=right_info, 

1848 alias_secondary=True, 

1849 extra_criteria=_extra_criteria, 

1850 ) 

1851 

1852 if sj is not None: 

1853 if isouter: 

1854 # note this is an inner join from secondary->right 

1855 right = sql.join(secondary, right, sj) 

1856 onclause = pj 

1857 else: 

1858 left = sql.join(left, secondary, pj, isouter) 

1859 onclause = sj 

1860 else: 

1861 onclause = pj 

1862 

1863 self._target_adapter = target_adapter 

1864 

1865 # we don't use the normal coercions logic for _ORMJoin 

1866 # (probably should), so do some gymnastics to get the entity. 

1867 # logic here is for #8721, which was a major bug in 1.4 

1868 # for almost two years, not reported/fixed until 1.4.43 (!) 

1869 if is_selectable(left_info): 

1870 parententity = left_selectable._annotations.get( 

1871 "parententity", None 

1872 ) 

1873 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1874 parententity = left_info 

1875 else: 

1876 parententity = None 

1877 

1878 if parententity is not None: 

1879 self._annotations = self._annotations.union( 

1880 {"parententity": parententity} 

1881 ) 

1882 

1883 augment_onclause = bool(_extra_criteria) and not prop 

1884 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1885 

1886 assert self.onclause is not None 

1887 

1888 if augment_onclause: 

1889 self.onclause &= sql.and_(*_extra_criteria) 

1890 

1891 if ( 

1892 not prop 

1893 and getattr(right_info, "mapper", None) 

1894 and right_info.mapper.single # type: ignore 

1895 ): 

1896 right_info = cast("_InternalEntityType[Any]", right_info) 

1897 # if single inheritance target and we are using a manual 

1898 # or implicit ON clause, augment it the same way we'd augment the 

1899 # WHERE. 

1900 single_crit = right_info.mapper._single_table_criterion 

1901 if single_crit is not None: 

1902 if insp_is_aliased_class(right_info): 

1903 single_crit = right_info._adapter.traverse(single_crit) 

1904 self.onclause = self.onclause & single_crit 

1905 

1906 def _splice_into_center(self, other): 

1907 """Splice a join into the center. 

1908 

1909 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1910 

1911 """ 

1912 leftmost = other 

1913 while isinstance(leftmost, sql.Join): 

1914 leftmost = leftmost.left 

1915 

1916 assert self.right is leftmost 

1917 

1918 left = _ORMJoin( 

1919 self.left, 

1920 other.left, 

1921 self.onclause, 

1922 isouter=self.isouter, 

1923 _left_memo=self._left_memo, 

1924 _right_memo=other._left_memo._path_registry, 

1925 ) 

1926 

1927 return _ORMJoin( 

1928 left, 

1929 other.right, 

1930 other.onclause, 

1931 isouter=other.isouter, 

1932 _right_memo=other._right_memo, 

1933 ) 

1934 

1935 def join( 

1936 self, 

1937 right: _FromClauseArgument, 

1938 onclause: Optional[_OnClauseArgument] = None, 

1939 isouter: bool = False, 

1940 full: bool = False, 

1941 ) -> _ORMJoin: 

1942 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1943 

1944 def outerjoin( 

1945 self, 

1946 right: _FromClauseArgument, 

1947 onclause: Optional[_OnClauseArgument] = None, 

1948 full: bool = False, 

1949 ) -> _ORMJoin: 

1950 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1951 

1952 

1953def with_parent( 

1954 instance: object, 

1955 prop: attributes.QueryableAttribute[Any], 

1956 from_entity: Optional[_EntityType[Any]] = None, 

1957) -> ColumnElement[bool]: 

1958 """Create filtering criterion that relates this query's primary entity 

1959 to the given related instance, using established 

1960 :func:`_orm.relationship()` 

1961 configuration. 

1962 

1963 E.g.:: 

1964 

1965 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1966 

1967 The SQL rendered is the same as that rendered when a lazy loader 

1968 would fire off from the given parent on that attribute, meaning 

1969 that the appropriate state is taken from the parent object in 

1970 Python without the need to render joins to the parent table 

1971 in the rendered statement. 

1972 

1973 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

1974 to indicate the left side of the criteria:: 

1975 

1976 

1977 a1 = aliased(Address) 

1978 a2 = aliased(Address) 

1979 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

1980 

1981 The above use is equivalent to using the 

1982 :func:`_orm.with_parent.from_entity` argument:: 

1983 

1984 a1 = aliased(Address) 

1985 a2 = aliased(Address) 

1986 stmt = select(a1, a2).where( 

1987 with_parent(u1, User.addresses, from_entity=a2) 

1988 ) 

1989 

1990 :param instance: 

1991 An instance which has some :func:`_orm.relationship`. 

1992 

1993 :param property: 

1994 Class-bound attribute, which indicates 

1995 what relationship from the instance should be used to reconcile the 

1996 parent/child relationship. 

1997 

1998 :param from_entity: 

1999 Entity in which to consider as the left side. This defaults to the 

2000 "zero" entity of the :class:`_query.Query` itself. 

2001 

2002 .. versionadded:: 1.2 

2003 

2004 """ # noqa: E501 

2005 prop_t: RelationshipProperty[Any] 

2006 

2007 if isinstance(prop, str): 

2008 raise sa_exc.ArgumentError( 

2009 "with_parent() accepts class-bound mapped attributes, not strings" 

2010 ) 

2011 elif isinstance(prop, attributes.QueryableAttribute): 

2012 if prop._of_type: 

2013 from_entity = prop._of_type 

2014 mapper_property = prop.property 

2015 if mapper_property is None or not prop_is_relationship( 

2016 mapper_property 

2017 ): 

2018 raise sa_exc.ArgumentError( 

2019 f"Expected relationship property for with_parent(), " 

2020 f"got {mapper_property}" 

2021 ) 

2022 prop_t = mapper_property 

2023 else: 

2024 prop_t = prop 

2025 

2026 return prop_t._with_parent(instance, from_entity=from_entity) 

2027 

2028 

2029def has_identity(object_: object) -> bool: 

2030 """Return True if the given object has a database 

2031 identity. 

2032 

2033 This typically corresponds to the object being 

2034 in either the persistent or detached state. 

2035 

2036 .. seealso:: 

2037 

2038 :func:`.was_deleted` 

2039 

2040 """ 

2041 state = attributes.instance_state(object_) 

2042 return state.has_identity 

2043 

2044 

2045def was_deleted(object_: object) -> bool: 

2046 """Return True if the given object was deleted 

2047 within a session flush. 

2048 

2049 This is regardless of whether or not the object is 

2050 persistent or detached. 

2051 

2052 .. seealso:: 

2053 

2054 :attr:`.InstanceState.was_deleted` 

2055 

2056 """ 

2057 

2058 state = attributes.instance_state(object_) 

2059 return state.was_deleted 

2060 

2061 

2062def _entity_corresponds_to( 

2063 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2064) -> bool: 

2065 """determine if 'given' corresponds to 'entity', in terms 

2066 of an entity passed to Query that would match the same entity 

2067 being referred to elsewhere in the query. 

2068 

2069 """ 

2070 if insp_is_aliased_class(entity): 

2071 if insp_is_aliased_class(given): 

2072 if entity._base_alias() is given._base_alias(): 

2073 return True 

2074 return False 

2075 elif insp_is_aliased_class(given): 

2076 if given._use_mapper_path: 

2077 return entity in given.with_polymorphic_mappers 

2078 else: 

2079 return entity is given 

2080 

2081 assert insp_is_mapper(given) 

2082 return entity.common_parent(given) 

2083 

2084 

2085def _entity_corresponds_to_use_path_impl( 

2086 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2087) -> bool: 

2088 """determine if 'given' corresponds to 'entity', in terms 

2089 of a path of loader options where a mapped attribute is taken to 

2090 be a member of a parent entity. 

2091 

2092 e.g.:: 

2093 

2094 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2095 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2096 

2097 a1 = aliased(A) 

2098 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2099 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2100 

2101 wp = with_polymorphic(A, [A1, A2]) 

2102 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2103 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2104 

2105 """ 

2106 if insp_is_aliased_class(given): 

2107 return ( 

2108 insp_is_aliased_class(entity) 

2109 and not entity._use_mapper_path 

2110 and (given is entity or entity in given._with_polymorphic_entities) 

2111 ) 

2112 elif not insp_is_aliased_class(entity): 

2113 return given.isa(entity.mapper) 

2114 else: 

2115 return ( 

2116 entity._use_mapper_path 

2117 and given in entity.with_polymorphic_mappers 

2118 ) 

2119 

2120 

2121def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2122 """determine if 'given' "is a" mapper, in terms of the given 

2123 would load rows of type 'mapper'. 

2124 

2125 """ 

2126 if given.is_aliased_class: 

2127 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2128 mapper 

2129 ) 

2130 elif given.with_polymorphic_mappers: 

2131 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2132 else: 

2133 return given.isa(mapper) 

2134 

2135 

2136def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2137 """calculate __getitem__ in terms of an iterable query object 

2138 that also has a slice() method. 

2139 

2140 """ 

2141 

2142 def _no_negative_indexes(): 

2143 raise IndexError( 

2144 "negative indexes are not accepted by SQL " 

2145 "index / slice operators" 

2146 ) 

2147 

2148 if isinstance(item, slice): 

2149 start, stop, step = util.decode_slice(item) 

2150 

2151 if ( 

2152 isinstance(stop, int) 

2153 and isinstance(start, int) 

2154 and stop - start <= 0 

2155 ): 

2156 return [] 

2157 

2158 elif (isinstance(start, int) and start < 0) or ( 

2159 isinstance(stop, int) and stop < 0 

2160 ): 

2161 _no_negative_indexes() 

2162 

2163 res = iterable_query.slice(start, stop) 

2164 if step is not None: 

2165 return list(res)[None : None : item.step] 

2166 else: 

2167 return list(res) 

2168 else: 

2169 if item == -1: 

2170 _no_negative_indexes() 

2171 else: 

2172 return list(iterable_query[item : item + 1])[0] 

2173 

2174 

2175def _is_mapped_annotation( 

2176 raw_annotation: _AnnotationScanType, 

2177 cls: Type[Any], 

2178 originating_cls: Type[Any], 

2179) -> bool: 

2180 try: 

2181 annotated = de_stringify_annotation( 

2182 cls, raw_annotation, originating_cls.__module__ 

2183 ) 

2184 except NameError: 

2185 # in most cases, at least within our own tests, we can raise 

2186 # here, which is more accurate as it prevents us from returning 

2187 # false negatives. However, in the real world, try to avoid getting 

2188 # involved with end-user annotations that have nothing to do with us. 

2189 # see issue #8888 where we bypass using this function in the case 

2190 # that we want to detect an unresolvable Mapped[] type. 

2191 return False 

2192 else: 

2193 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2194 

2195 

2196class _CleanupError(Exception): 

2197 pass 

2198 

2199 

2200def _cleanup_mapped_str_annotation( 

2201 annotation: str, originating_module: str 

2202) -> str: 

2203 # fix up an annotation that comes in as the form: 

2204 # 'Mapped[List[Address]]' so that it instead looks like: 

2205 # 'Mapped[List["Address"]]' , which will allow us to get 

2206 # "Address" as a string 

2207 

2208 # additionally, resolve symbols for these names since this is where 

2209 # we'd have to do it 

2210 

2211 inner: Optional[Match[str]] 

2212 

2213 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2214 

2215 if not mm: 

2216 return annotation 

2217 

2218 # ticket #8759. Resolve the Mapped name to a real symbol. 

2219 # originally this just checked the name. 

2220 try: 

2221 obj = eval_name_only(mm.group(1), originating_module) 

2222 except NameError as ne: 

2223 raise _CleanupError( 

2224 f'For annotation "{annotation}", could not resolve ' 

2225 f'container type "{mm.group(1)}". ' 

2226 "Please ensure this type is imported at the module level " 

2227 "outside of TYPE_CHECKING blocks" 

2228 ) from ne 

2229 

2230 if obj is typing.ClassVar: 

2231 real_symbol = "ClassVar" 

2232 else: 

2233 try: 

2234 if issubclass(obj, _MappedAnnotationBase): 

2235 real_symbol = obj.__name__ 

2236 else: 

2237 return annotation 

2238 except TypeError: 

2239 # avoid isinstance(obj, type) check, just catch TypeError 

2240 return annotation 

2241 

2242 # note: if one of the codepaths above didn't define real_symbol and 

2243 # then didn't return, real_symbol raises UnboundLocalError 

2244 # which is actually a NameError, and the calling routines don't 

2245 # notice this since they are catching NameError anyway. Just in case 

2246 # this is being modified in the future, something to be aware of. 

2247 

2248 stack = [] 

2249 inner = mm 

2250 while True: 

2251 stack.append(real_symbol if mm is inner else inner.group(1)) 

2252 g2 = inner.group(2) 

2253 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2254 if inner is None: 

2255 stack.append(g2) 

2256 break 

2257 

2258 # stacks we want to rewrite, that is, quote the last entry which 

2259 # we think is a relationship class name: 

2260 # 

2261 # ['Mapped', 'List', 'Address'] 

2262 # ['Mapped', 'A'] 

2263 # 

2264 # stacks we dont want to rewrite, which are generally MappedColumn 

2265 # use cases: 

2266 # 

2267 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2268 # ['Mapped', 'dict[str, str] | None'] 

2269 

2270 if ( 

2271 # avoid already quoted symbols such as 

2272 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2273 not re.match(r"""^["'].*["']$""", stack[-1]) 

2274 # avoid further generics like Dict[] such as 

2275 # ['Mapped', 'dict[str, str] | None'], 

2276 # ['Mapped', 'list[int] | list[str]'], 

2277 # ['Mapped', 'Union[list[int], list[str]]'], 

2278 and not re.search(r"[\[\]]", stack[-1]) 

2279 ): 

2280 stripchars = "\"' " 

2281 stack[-1] = ", ".join( 

2282 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2283 ) 

2284 

2285 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2286 

2287 return annotation 

2288 

2289 

2290def _extract_mapped_subtype( 

2291 raw_annotation: Optional[_AnnotationScanType], 

2292 cls: type, 

2293 originating_module: str, 

2294 key: str, 

2295 attr_cls: Type[Any], 

2296 required: bool, 

2297 is_dataclass_field: bool, 

2298 expect_mapped: bool = True, 

2299 raiseerr: bool = True, 

2300) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2301 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2302 so, return the ``something`` part. 

2303 

2304 Includes error raise scenarios and other options. 

2305 

2306 """ 

2307 

2308 if raw_annotation is None: 

2309 if required: 

2310 raise orm_exc.MappedAnnotationError( 

2311 f"Python typing annotation is required for attribute " 

2312 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2313 f'"{attr_cls.__name__}" construct are None or not present' 

2314 ) 

2315 return None 

2316 

2317 try: 

2318 # destringify the "outside" of the annotation. note we are not 

2319 # adding include_generic so it will *not* dig into generic contents, 

2320 # which will remain as ForwardRef or plain str under future annotations 

2321 # mode. The full destringify happens later when mapped_column goes 

2322 # to do a full lookup in the registry type_annotations_map. 

2323 annotated = de_stringify_annotation( 

2324 cls, 

2325 raw_annotation, 

2326 originating_module, 

2327 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2328 ) 

2329 except _CleanupError as ce: 

2330 raise orm_exc.MappedAnnotationError( 

2331 f"Could not interpret annotation {raw_annotation}. " 

2332 "Check that it uses names that are correctly imported at the " 

2333 "module level. See chained stack trace for more hints." 

2334 ) from ce 

2335 except NameError as ne: 

2336 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2337 raise orm_exc.MappedAnnotationError( 

2338 f"Could not interpret annotation {raw_annotation}. " 

2339 "Check that it uses names that are correctly imported at the " 

2340 "module level. See chained stack trace for more hints." 

2341 ) from ne 

2342 

2343 annotated = raw_annotation # type: ignore 

2344 

2345 if is_dataclass_field: 

2346 return annotated, None 

2347 else: 

2348 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2349 annotated, _MappedAnnotationBase 

2350 ): 

2351 if expect_mapped: 

2352 if not raiseerr: 

2353 return None 

2354 

2355 origin = getattr(annotated, "__origin__", None) 

2356 if origin is typing.ClassVar: 

2357 return None 

2358 

2359 # check for other kind of ORM descriptor like AssociationProxy, 

2360 # don't raise for that (issue #9957) 

2361 elif isinstance(origin, type) and issubclass( 

2362 origin, ORMDescriptor 

2363 ): 

2364 return None 

2365 

2366 raise orm_exc.MappedAnnotationError( 

2367 f'Type annotation for "{cls.__name__}.{key}" ' 

2368 "can't be correctly interpreted for " 

2369 "Annotated Declarative Table form. ORM annotations " 

2370 "should normally make use of the ``Mapped[]`` generic " 

2371 "type, or other ORM-compatible generic type, as a " 

2372 "container for the actual type, which indicates the " 

2373 "intent that the attribute is mapped. " 

2374 "Class variables that are not intended to be mapped " 

2375 "by the ORM should use ClassVar[]. " 

2376 "To allow Annotated Declarative to disregard legacy " 

2377 "annotations which don't use Mapped[] to pass, set " 

2378 '"__allow_unmapped__ = True" on the class or a ' 

2379 "superclass this class.", 

2380 code="zlpr", 

2381 ) 

2382 

2383 else: 

2384 return annotated, None 

2385 

2386 if len(annotated.__args__) != 1: 

2387 raise orm_exc.MappedAnnotationError( 

2388 "Expected sub-type for Mapped[] annotation" 

2389 ) 

2390 

2391 return ( 

2392 # fix dict/list/set args to be ForwardRef, see #11814 

2393 fixup_container_fwd_refs(annotated.__args__[0]), 

2394 annotated.__origin__, 

2395 ) 

2396 

2397 

2398def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2399 if hasattr(prop, "_mapper_property_name"): 

2400 name = prop._mapper_property_name() 

2401 else: 

2402 name = None 

2403 return util.clsname_as_plain_name(prop, name)