Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

842 statements  

1# orm/util.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import get_origin 

24from typing import Iterable 

25from typing import Iterator 

26from typing import List 

27from typing import Literal 

28from typing import Match 

29from typing import Optional 

30from typing import Protocol 

31from typing import Sequence 

32from typing import Tuple 

33from typing import Type 

34from typing import TYPE_CHECKING 

35from typing import TypeVar 

36from typing import Union 

37import weakref 

38 

39from . import attributes # noqa 

40from . import exc as orm_exc 

41from ._typing import _O 

42from ._typing import insp_is_aliased_class 

43from ._typing import insp_is_mapper 

44from ._typing import prop_is_relationship 

45from .base import _class_to_mapper as _class_to_mapper 

46from .base import _MappedAnnotationBase 

47from .base import _never_set as _never_set # noqa: F401 

48from .base import _none_only_set as _none_only_set # noqa: F401 

49from .base import _none_set as _none_set # noqa: F401 

50from .base import attribute_str as attribute_str # noqa: F401 

51from .base import class_mapper as class_mapper 

52from .base import DynamicMapped 

53from .base import InspectionAttr as InspectionAttr 

54from .base import instance_str as instance_str # noqa: F401 

55from .base import Mapped 

56from .base import object_mapper as object_mapper 

57from .base import object_state as object_state # noqa: F401 

58from .base import opt_manager_of_class 

59from .base import ORMDescriptor 

60from .base import state_attribute_str as state_attribute_str # noqa: F401 

61from .base import state_class_str as state_class_str # noqa: F401 

62from .base import state_str as state_str # noqa: F401 

63from .base import WriteOnlyMapped 

64from .interfaces import CriteriaOption 

65from .interfaces import MapperProperty as MapperProperty 

66from .interfaces import ORMColumnsClauseRole 

67from .interfaces import ORMEntityColumnsClauseRole 

68from .interfaces import ORMFromClauseRole 

69from .path_registry import PathRegistry as PathRegistry 

70from .. import event 

71from .. import exc as sa_exc 

72from .. import inspection 

73from .. import sql 

74from .. import util 

75from ..engine.result import result_tuple 

76from ..sql import coercions 

77from ..sql import expression 

78from ..sql import lambdas 

79from ..sql import roles 

80from ..sql import util as sql_util 

81from ..sql import visitors 

82from ..sql._typing import is_selectable 

83from ..sql.annotation import SupportsCloneAnnotations 

84from ..sql.base import WriteableColumnCollection 

85from ..sql.cache_key import HasCacheKey 

86from ..sql.cache_key import MemoizedHasCacheKey 

87from ..sql.elements import ColumnElement 

88from ..sql.elements import KeyedColumnElement 

89from ..sql.schema import MetaData 

90from ..sql.selectable import FromClause 

91from ..util.langhelpers import MemoizedSlots 

92from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

93from ..util.typing import eval_name_only as _eval_name_only 

94from ..util.typing import fixup_container_fwd_refs 

95from ..util.typing import GenericProtocol 

96from ..util.typing import is_origin_of_cls 

97from ..util.typing import TupleAny 

98from ..util.typing import Unpack 

99 

100if typing.TYPE_CHECKING: 

101 from ._typing import _EntityType 

102 from ._typing import _IdentityKeyType 

103 from ._typing import _InternalEntityType 

104 from ._typing import _ORMCOLEXPR 

105 from .context import _MapperEntity 

106 from .context import _ORMCompileState 

107 from .decl_api import RegistryType 

108 from .mapper import Mapper 

109 from .path_registry import _AbstractEntityRegistry 

110 from .query import Query 

111 from .relationships import RelationshipProperty 

112 from ..engine import Row 

113 from ..engine import RowMapping 

114 from ..sql._typing import _CE 

115 from ..sql._typing import _ColumnExpressionArgument 

116 from ..sql._typing import _EquivalentColumnMap 

117 from ..sql._typing import _FromClauseArgument 

118 from ..sql._typing import _OnClauseArgument 

119 from ..sql._typing import _PropagateAttrsType 

120 from ..sql.annotation import _SA 

121 from ..sql.base import ReadOnlyColumnCollection 

122 from ..sql.elements import BindParameter 

123 from ..sql.selectable import _ColumnsClauseElement 

124 from ..sql.selectable import Select 

125 from ..sql.selectable import Selectable 

126 from ..sql.visitors import anon_map 

127 from ..util.typing import _AnnotationScanType 

128 from ..util.typing import _MatchedOnType 

129 

130_T = TypeVar("_T", bound=Any) 

131 

132all_cascades = frozenset( 

133 ( 

134 "delete", 

135 "delete-orphan", 

136 "all", 

137 "merge", 

138 "expunge", 

139 "save-update", 

140 "refresh-expire", 

141 "none", 

142 ) 

143) 

144 

145_de_stringify_partial = functools.partial( 

146 functools.partial, 

147 locals_=util.immutabledict( 

148 { 

149 "Mapped": Mapped, 

150 "WriteOnlyMapped": WriteOnlyMapped, 

151 "DynamicMapped": DynamicMapped, 

152 } 

153 ), 

154) 

155 

156# partial is practically useless as we have to write out the whole 

157# function and maintain the signature anyway 

158 

159 

160class _DeStringifyAnnotation(Protocol): 

161 def __call__( 

162 self, 

163 cls: Type[Any], 

164 annotation: _AnnotationScanType, 

165 originating_module: str, 

166 *, 

167 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

168 include_generic: bool = False, 

169 ) -> _MatchedOnType: ... 

170 

171 

172de_stringify_annotation = cast( 

173 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

174) 

175 

176 

177class _EvalNameOnly(Protocol): 

178 def __call__(self, name: str, module_name: str) -> Any: ... 

179 

180 

181eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

182 

183 

184class CascadeOptions(FrozenSet[str]): 

185 """Keeps track of the options sent to 

186 :paramref:`.relationship.cascade`""" 

187 

188 _add_w_all_cascades = all_cascades.difference( 

189 ["all", "none", "delete-orphan"] 

190 ) 

191 _allowed_cascades = all_cascades 

192 

193 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

194 

195 __slots__ = ( 

196 "save_update", 

197 "delete", 

198 "refresh_expire", 

199 "merge", 

200 "expunge", 

201 "delete_orphan", 

202 ) 

203 

204 save_update: bool 

205 delete: bool 

206 refresh_expire: bool 

207 merge: bool 

208 expunge: bool 

209 delete_orphan: bool 

210 

211 def __new__( 

212 cls, value_list: Optional[Union[Iterable[str], str]] 

213 ) -> CascadeOptions: 

214 if isinstance(value_list, str) or value_list is None: 

215 return cls.from_string(value_list) # type: ignore 

216 values = set(value_list) 

217 if values.difference(cls._allowed_cascades): 

218 raise sa_exc.ArgumentError( 

219 "Invalid cascade option(s): %s" 

220 % ", ".join( 

221 [ 

222 repr(x) 

223 for x in sorted( 

224 values.difference(cls._allowed_cascades) 

225 ) 

226 ] 

227 ) 

228 ) 

229 

230 if "all" in values: 

231 values.update(cls._add_w_all_cascades) 

232 if "none" in values: 

233 values.clear() 

234 values.discard("all") 

235 

236 self = super().__new__(cls, values) 

237 self.save_update = "save-update" in values 

238 self.delete = "delete" in values 

239 self.refresh_expire = "refresh-expire" in values 

240 self.merge = "merge" in values 

241 self.expunge = "expunge" in values 

242 self.delete_orphan = "delete-orphan" in values 

243 

244 if self.delete_orphan and not self.delete: 

245 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

246 return self 

247 

248 def __repr__(self): 

249 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

250 

251 @classmethod 

252 def from_string(cls, arg): 

253 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

254 return cls(values) 

255 

256 

257def _metadata_for_cls(cls: Type[Any], registry: RegistryType) -> MetaData: 

258 meta = getattr(cls, "metadata", None) 

259 if meta is not None and isinstance(meta, MetaData): 

260 return meta 

261 return registry.metadata 

262 

263 

264def _validator_events(desc, key, validator, include_removes, include_backrefs): 

265 """Runs a validation method on an attribute value to be set or 

266 appended. 

267 """ 

268 

269 if not include_backrefs: 

270 

271 def detect_is_backref(state, initiator): 

272 impl = state.manager[key].impl 

273 return initiator.impl is not impl 

274 

275 if include_removes: 

276 

277 def append(state, value, initiator): 

278 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

279 include_backrefs or not detect_is_backref(state, initiator) 

280 ): 

281 return validator(state.obj(), key, value, False) 

282 else: 

283 return value 

284 

285 def bulk_set(state, values, initiator): 

286 if include_backrefs or not detect_is_backref(state, initiator): 

287 obj = state.obj() 

288 values[:] = [ 

289 validator(obj, key, value, False) for value in values 

290 ] 

291 

292 def set_(state, value, oldvalue, initiator): 

293 if include_backrefs or not detect_is_backref(state, initiator): 

294 return validator(state.obj(), key, value, False) 

295 else: 

296 return value 

297 

298 def remove(state, value, initiator): 

299 if include_backrefs or not detect_is_backref(state, initiator): 

300 validator(state.obj(), key, value, True) 

301 

302 else: 

303 

304 def append(state, value, initiator): 

305 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

306 include_backrefs or not detect_is_backref(state, initiator) 

307 ): 

308 return validator(state.obj(), key, value) 

309 else: 

310 return value 

311 

312 def bulk_set(state, values, initiator): 

313 if include_backrefs or not detect_is_backref(state, initiator): 

314 obj = state.obj() 

315 values[:] = [validator(obj, key, value) for value in values] 

316 

317 def set_(state, value, oldvalue, initiator): 

318 if include_backrefs or not detect_is_backref(state, initiator): 

319 return validator(state.obj(), key, value) 

320 else: 

321 return value 

322 

323 event.listen(desc, "append", append, raw=True, retval=True) 

324 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

325 event.listen(desc, "set", set_, raw=True, retval=True) 

326 if include_removes: 

327 event.listen(desc, "remove", remove, raw=True, retval=True) 

328 

329 

330def polymorphic_union( 

331 table_map, typecolname, aliasname="p_union", cast_nulls=True 

332): 

333 """Create a ``UNION`` statement used by a polymorphic mapper. 

334 

335 See :ref:`concrete_inheritance` for an example of how 

336 this is used. 

337 

338 :param table_map: mapping of polymorphic identities to 

339 :class:`_schema.Table` objects. 

340 :param typecolname: string name of a "discriminator" column, which will be 

341 derived from the query, producing the polymorphic identity for 

342 each row. If ``None``, no polymorphic discriminator is generated. 

343 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

344 construct generated. 

345 :param cast_nulls: if True, non-existent columns, which are represented 

346 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

347 that is problematic on some backends such as Oracle - in which case it 

348 can be set to False. 

349 

350 """ 

351 

352 colnames: util.OrderedSet[str] = util.OrderedSet() 

353 colnamemaps = {} 

354 types = {} 

355 for key in table_map: 

356 table = table_map[key] 

357 

358 table = coercions.expect(roles.FromClauseRole, table) 

359 table_map[key] = table 

360 

361 m = {} 

362 for c in table.c: 

363 if c.key == typecolname: 

364 raise sa_exc.InvalidRequestError( 

365 "Polymorphic union can't use '%s' as the discriminator " 

366 "column due to mapped column %r; please apply the " 

367 "'typecolname' " 

368 "argument; this is available on " 

369 "ConcreteBase as '_concrete_discriminator_name'" 

370 % (typecolname, c) 

371 ) 

372 colnames.add(c.key) 

373 m[c.key] = c 

374 types[c.key] = c.type 

375 colnamemaps[table] = m 

376 

377 def col(name, table): 

378 try: 

379 return colnamemaps[table][name] 

380 except KeyError: 

381 if cast_nulls: 

382 return sql.cast(sql.null(), types[name]).label(name) 

383 else: 

384 return sql.type_coerce(sql.null(), types[name]).label(name) 

385 

386 result = [] 

387 for type_, table in table_map.items(): 

388 if typecolname is not None: 

389 result.append( 

390 sql.select( 

391 *( 

392 [col(name, table) for name in colnames] 

393 + [ 

394 sql.literal_column( 

395 sql_util._quote_ddl_expr(type_) 

396 ).label(typecolname) 

397 ] 

398 ) 

399 ).select_from(table) 

400 ) 

401 else: 

402 result.append( 

403 sql.select( 

404 *[col(name, table) for name in colnames] 

405 ).select_from(table) 

406 ) 

407 return sql.union_all(*result).alias(aliasname) 

408 

409 

410def identity_key( 

411 class_: Optional[Type[_T]] = None, 

412 ident: Union[Any, Tuple[Any, ...]] = None, 

413 *, 

414 instance: Optional[_T] = None, 

415 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

416 identity_token: Optional[Any] = None, 

417) -> _IdentityKeyType[_T]: 

418 r"""Generate "identity key" tuples, as are used as keys in the 

419 :attr:`.Session.identity_map` dictionary. 

420 

421 This function has several call styles: 

422 

423 * ``identity_key(class, ident, identity_token=token)`` 

424 

425 This form receives a mapped class and a primary key scalar or 

426 tuple as an argument. 

427 

428 E.g.:: 

429 

430 >>> identity_key(MyClass, (1, 2)) 

431 (<class '__main__.MyClass'>, (1, 2), None) 

432 

433 :param class: mapped class (must be a positional argument) 

434 :param ident: primary key, may be a scalar or tuple argument. 

435 :param identity_token: optional identity token 

436 

437 * ``identity_key(instance=instance)`` 

438 

439 This form will produce the identity key for a given instance. The 

440 instance need not be persistent, only that its primary key attributes 

441 are populated (else the key will contain ``None`` for those missing 

442 values). 

443 

444 E.g.:: 

445 

446 >>> instance = MyClass(1, 2) 

447 >>> identity_key(instance=instance) 

448 (<class '__main__.MyClass'>, (1, 2), None) 

449 

450 In this form, the given instance is ultimately run though 

451 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

452 effect of performing a database check for the corresponding row 

453 if the object is expired. 

454 

455 :param instance: object instance (must be given as a keyword arg) 

456 

457 * ``identity_key(class, row=row, identity_token=token)`` 

458 

459 This form is similar to the class/tuple form, except is passed a 

460 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

461 

462 E.g.:: 

463 

464 >>> row = engine.execute(text("select * from table where a=1 and b=2")).first() 

465 >>> identity_key(MyClass, row=row) 

466 (<class '__main__.MyClass'>, (1, 2), None) 

467 

468 :param class: mapped class (must be a positional argument) 

469 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

470 (must be given as a keyword arg) 

471 :param identity_token: optional identity token 

472 

473 """ # noqa: E501 

474 if class_ is not None: 

475 mapper = class_mapper(class_) 

476 if row is None: 

477 if ident is None: 

478 raise sa_exc.ArgumentError("ident or row is required") 

479 return mapper.identity_key_from_primary_key( 

480 tuple(util.to_list(ident)), identity_token=identity_token 

481 ) 

482 else: 

483 return mapper.identity_key_from_row( 

484 row, identity_token=identity_token 

485 ) 

486 elif instance is not None: 

487 mapper = object_mapper(instance) 

488 return mapper.identity_key_from_instance(instance) 

489 else: 

490 raise sa_exc.ArgumentError("class or instance is required") 

491 

492 

493class _TraceAdaptRole(enum.Enum): 

494 """Enumeration of all the use cases for ORMAdapter. 

495 

496 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

497 used for in-place adaption of column expressions to be applied to a SELECT, 

498 replacing :class:`.Table` and other objects that are mapped to classes with 

499 aliases of those tables in the case of joined eager loading, or in the case 

500 of polymorphic loading as used with concrete mappings or other custom "with 

501 polymorphic" parameters, with whole user-defined subqueries. The 

502 enumerations provide an overview of all the use cases used by ORMAdapter, a 

503 layer of formality as to the introduction of new ORMAdapter use cases (of 

504 which none are anticipated), as well as a means to trace the origins of a 

505 particular ORMAdapter within runtime debugging. 

506 

507 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

508 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

509 method and the ``Query.select_from_entity()`` methods, favoring 

510 user-explicit aliasing schemes using the ``aliased()`` and 

511 ``with_polymorphic()`` standalone constructs; these still use adaption, 

512 however the adaption is applied in a narrower scope. 

513 

514 """ 

515 

516 # aliased() use that is used to adapt individual attributes at query 

517 # construction time 

518 ALIASED_INSP = enum.auto() 

519 

520 # joinedload cases; typically adapt an ON clause of a relationship 

521 # join 

522 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

523 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

524 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

525 

526 # polymorphic cases - these are complex ones that replace FROM 

527 # clauses, replacing tables with subqueries 

528 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

529 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

530 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

531 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

532 

533 # the from_statement() case, used only to adapt individual attributes 

534 # from a given statement to local ORM attributes at result fetching 

535 # time. assigned to ORMCompileState._from_obj_alias 

536 ADAPT_FROM_STATEMENT = enum.auto() 

537 

538 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

539 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

540 # joinedloads are then placed on the outside. 

541 # assigned to ORMCompileState.compound_eager_adapter 

542 COMPOUND_EAGER_STATEMENT = enum.auto() 

543 

544 # the legacy Query._set_select_from() case. 

545 # this is needed for Query's set operations (i.e. UNION, etc. ) 

546 # as well as "legacy from_self()", which while removed from 2.0 as 

547 # public API, is used for the Query.count() method. this one 

548 # still does full statement traversal 

549 # assigned to ORMCompileState._from_obj_alias 

550 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

551 

552 

553class ORMStatementAdapter(sql_util.ColumnAdapter): 

554 """ColumnAdapter which includes a role attribute.""" 

555 

556 __slots__ = ("role",) 

557 

558 def __init__( 

559 self, 

560 role: _TraceAdaptRole, 

561 selectable: Selectable, 

562 *, 

563 equivalents: Optional[_EquivalentColumnMap] = None, 

564 adapt_required: bool = False, 

565 allow_label_resolve: bool = True, 

566 anonymize_labels: bool = False, 

567 adapt_on_names: bool = False, 

568 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

569 ): 

570 self.role = role 

571 super().__init__( 

572 selectable, 

573 equivalents=equivalents, 

574 adapt_required=adapt_required, 

575 allow_label_resolve=allow_label_resolve, 

576 anonymize_labels=anonymize_labels, 

577 adapt_on_names=adapt_on_names, 

578 adapt_from_selectables=adapt_from_selectables, 

579 ) 

580 

581 

582class ORMAdapter(sql_util.ColumnAdapter): 

583 """ColumnAdapter subclass which excludes adaptation of entities from 

584 non-matching mappers. 

585 

586 """ 

587 

588 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

589 

590 is_aliased_class: bool 

591 aliased_insp: Optional[AliasedInsp[Any]] 

592 

593 def __init__( 

594 self, 

595 role: _TraceAdaptRole, 

596 entity: _InternalEntityType[Any], 

597 *, 

598 equivalents: Optional[_EquivalentColumnMap] = None, 

599 adapt_required: bool = False, 

600 allow_label_resolve: bool = True, 

601 anonymize_labels: bool = False, 

602 selectable: Optional[Selectable] = None, 

603 limit_on_entity: bool = True, 

604 adapt_on_names: bool = False, 

605 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

606 ): 

607 self.role = role 

608 self.mapper = entity.mapper 

609 if selectable is None: 

610 selectable = entity.selectable 

611 if insp_is_aliased_class(entity): 

612 self.is_aliased_class = True 

613 self.aliased_insp = entity 

614 else: 

615 self.is_aliased_class = False 

616 self.aliased_insp = None 

617 

618 super().__init__( 

619 selectable, 

620 equivalents, 

621 adapt_required=adapt_required, 

622 allow_label_resolve=allow_label_resolve, 

623 anonymize_labels=anonymize_labels, 

624 include_fn=self._include_fn if limit_on_entity else None, 

625 adapt_on_names=adapt_on_names, 

626 adapt_from_selectables=adapt_from_selectables, 

627 ) 

628 

629 def _include_fn(self, elem): 

630 entity = elem._annotations.get("parentmapper", None) 

631 

632 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

633 

634 

635class AliasedClass( 

636 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

637): 

638 r"""Represents an "aliased" form of a mapped class for usage with Query. 

639 

640 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

641 construct, this object mimics the mapped class using a 

642 ``__getattr__`` scheme and maintains a reference to a 

643 real :class:`~sqlalchemy.sql.expression.Alias` object. 

644 

645 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

646 within a SQL statement generated by the ORM, such that an existing 

647 mapped entity can be used in multiple contexts. A simple example:: 

648 

649 # find all pairs of users with the same name 

650 user_alias = aliased(User) 

651 session.query(User, user_alias).join( 

652 (user_alias, User.id > user_alias.id) 

653 ).filter(User.name == user_alias.name) 

654 

655 :class:`.AliasedClass` is also capable of mapping an existing mapped 

656 class to an entirely new selectable, provided this selectable is column- 

657 compatible with the existing mapped selectable, and it can also be 

658 configured in a mapping as the target of a :func:`_orm.relationship`. 

659 See the links below for examples. 

660 

661 The :class:`.AliasedClass` object is constructed typically using the 

662 :func:`_orm.aliased` function. It also is produced with additional 

663 configuration when using the :func:`_orm.with_polymorphic` function. 

664 

665 The resulting object is an instance of :class:`.AliasedClass`. 

666 This object implements an attribute scheme which produces the 

667 same attribute and method interface as the original mapped 

668 class, allowing :class:`.AliasedClass` to be compatible 

669 with any attribute technique which works on the original class, 

670 including hybrid attributes (see :ref:`hybrids_toplevel`). 

671 

672 The :class:`.AliasedClass` can be inspected for its underlying 

673 :class:`_orm.Mapper`, aliased selectable, and other information 

674 using :func:`_sa.inspect`:: 

675 

676 from sqlalchemy import inspect 

677 

678 my_alias = aliased(MyClass) 

679 insp = inspect(my_alias) 

680 

681 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

682 

683 

684 .. seealso:: 

685 

686 :func:`.aliased` 

687 

688 :func:`.with_polymorphic` 

689 

690 :ref:`relationship_aliased_class` 

691 

692 :ref:`relationship_to_window_function` 

693 

694 

695 """ 

696 

697 __name__: str 

698 

699 def __init__( 

700 self, 

701 mapped_class_or_ac: _EntityType[_O], 

702 alias: Optional[FromClause] = None, 

703 name: Optional[str] = None, 

704 flat: bool = False, 

705 adapt_on_names: bool = False, 

706 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

707 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

708 base_alias: Optional[AliasedInsp[Any]] = None, 

709 use_mapper_path: bool = False, 

710 represents_outer_join: bool = False, 

711 ): 

712 insp = cast( 

713 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

714 ) 

715 mapper = insp.mapper 

716 

717 nest_adapters = False 

718 

719 if alias is None: 

720 if insp.is_aliased_class and insp.selectable._is_subquery: 

721 alias = insp.selectable.alias() 

722 else: 

723 alias = ( 

724 mapper._with_polymorphic_selectable._anonymous_fromclause( 

725 name=name, 

726 flat=flat, 

727 ) 

728 ) 

729 elif insp.is_aliased_class: 

730 nest_adapters = True 

731 

732 assert alias is not None 

733 self._aliased_insp = AliasedInsp( 

734 self, 

735 insp, 

736 alias, 

737 name, 

738 ( 

739 with_polymorphic_mappers 

740 if with_polymorphic_mappers 

741 else mapper.with_polymorphic_mappers 

742 ), 

743 ( 

744 with_polymorphic_discriminator 

745 if with_polymorphic_discriminator is not None 

746 else mapper.polymorphic_on 

747 ), 

748 base_alias, 

749 use_mapper_path, 

750 adapt_on_names, 

751 represents_outer_join, 

752 nest_adapters, 

753 ) 

754 

755 self.__name__ = f"aliased({mapper.class_.__name__})" 

756 

757 @classmethod 

758 def _reconstitute_from_aliased_insp( 

759 cls, aliased_insp: AliasedInsp[_O] 

760 ) -> AliasedClass[_O]: 

761 obj = cls.__new__(cls) 

762 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

763 obj._aliased_insp = aliased_insp 

764 

765 if aliased_insp._is_with_polymorphic: 

766 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

767 if sub_aliased_insp is not aliased_insp: 

768 ent = AliasedClass._reconstitute_from_aliased_insp( 

769 sub_aliased_insp 

770 ) 

771 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

772 

773 return obj 

774 

775 def __getattr__(self, key: str) -> Any: 

776 try: 

777 _aliased_insp = self.__dict__["_aliased_insp"] 

778 except KeyError: 

779 raise AttributeError() 

780 else: 

781 target = _aliased_insp._target 

782 # maintain all getattr mechanics 

783 attr = getattr(target, key) 

784 

785 # attribute is a method, that will be invoked against a 

786 # "self"; so just return a new method with the same function and 

787 # new self 

788 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

789 return types.MethodType(attr.__func__, self) 

790 

791 # attribute is a descriptor, that will be invoked against a 

792 # "self"; so invoke the descriptor against this self 

793 if hasattr(attr, "__get__"): 

794 attr = attr.__get__(None, self) 

795 

796 # attributes within the QueryableAttribute system will want this 

797 # to be invoked so the object can be adapted 

798 if hasattr(attr, "adapt_to_entity"): 

799 attr = attr.adapt_to_entity(_aliased_insp) 

800 setattr(self, key, attr) 

801 

802 return attr 

803 

804 def _get_from_serialized( 

805 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

806 ) -> Any: 

807 # this method is only used in terms of the 

808 # sqlalchemy.ext.serializer extension 

809 attr = getattr(mapped_class, key) 

810 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

811 return types.MethodType(attr.__func__, self) 

812 

813 # attribute is a descriptor, that will be invoked against a 

814 # "self"; so invoke the descriptor against this self 

815 if hasattr(attr, "__get__"): 

816 attr = attr.__get__(None, self) 

817 

818 # attributes within the QueryableAttribute system will want this 

819 # to be invoked so the object can be adapted 

820 if hasattr(attr, "adapt_to_entity"): 

821 aliased_insp._weak_entity = weakref.ref(self) 

822 attr = attr.adapt_to_entity(aliased_insp) 

823 setattr(self, key, attr) 

824 

825 return attr 

826 

827 def __repr__(self) -> str: 

828 return "<AliasedClass at 0x%x; %s>" % ( 

829 id(self), 

830 self._aliased_insp._target.__name__, 

831 ) 

832 

833 def __str__(self) -> str: 

834 return str(self._aliased_insp) 

835 

836 

837@inspection._self_inspects 

838class AliasedInsp( 

839 ORMEntityColumnsClauseRole[_O], 

840 ORMFromClauseRole, 

841 HasCacheKey, 

842 InspectionAttr, 

843 MemoizedSlots, 

844 inspection.Inspectable["AliasedInsp[_O]"], 

845 Generic[_O], 

846): 

847 """Provide an inspection interface for an 

848 :class:`.AliasedClass` object. 

849 

850 The :class:`.AliasedInsp` object is returned 

851 given an :class:`.AliasedClass` using the 

852 :func:`_sa.inspect` function:: 

853 

854 from sqlalchemy import inspect 

855 from sqlalchemy.orm import aliased 

856 

857 my_alias = aliased(MyMappedClass) 

858 insp = inspect(my_alias) 

859 

860 Attributes on :class:`.AliasedInsp` 

861 include: 

862 

863 * ``entity`` - the :class:`.AliasedClass` represented. 

864 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

865 * ``selectable`` - the :class:`_expression.Alias` 

866 construct which ultimately 

867 represents an aliased :class:`_schema.Table` or 

868 :class:`_expression.Select` 

869 construct. 

870 * ``name`` - the name of the alias. Also is used as the attribute 

871 name when returned in a result tuple from :class:`_query.Query`. 

872 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

873 objects 

874 indicating all those mappers expressed in the select construct 

875 for the :class:`.AliasedClass`. 

876 * ``polymorphic_on`` - an alternate column or SQL expression which 

877 will be used as the "discriminator" for a polymorphic load. 

878 

879 .. seealso:: 

880 

881 :ref:`inspection_toplevel` 

882 

883 """ 

884 

885 __slots__ = ( 

886 "__weakref__", 

887 "_weak_entity", 

888 "mapper", 

889 "selectable", 

890 "name", 

891 "_adapt_on_names", 

892 "with_polymorphic_mappers", 

893 "polymorphic_on", 

894 "_use_mapper_path", 

895 "_base_alias", 

896 "represents_outer_join", 

897 "persist_selectable", 

898 "local_table", 

899 "_is_with_polymorphic", 

900 "_with_polymorphic_entities", 

901 "_adapter", 

902 "_target", 

903 "__clause_element__", 

904 "_memoized_values", 

905 "_all_column_expressions", 

906 "_nest_adapters", 

907 ) 

908 

909 _cache_key_traversal = [ 

910 ("name", visitors.ExtendedInternalTraversal.dp_string), 

911 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

912 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

913 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

914 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

915 ( 

916 "with_polymorphic_mappers", 

917 visitors.InternalTraversal.dp_has_cache_key_list, 

918 ), 

919 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

920 ] 

921 

922 mapper: Mapper[_O] 

923 selectable: FromClause 

924 _adapter: ORMAdapter 

925 with_polymorphic_mappers: Sequence[Mapper[Any]] 

926 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

927 

928 _weak_entity: weakref.ref[AliasedClass[_O]] 

929 """the AliasedClass that refers to this AliasedInsp""" 

930 

931 _target: Union[Type[_O], AliasedClass[_O]] 

932 """the thing referenced by the AliasedClass/AliasedInsp. 

933 

934 In the vast majority of cases, this is the mapped class. However 

935 it may also be another AliasedClass (alias of alias). 

936 

937 """ 

938 

939 def __init__( 

940 self, 

941 entity: AliasedClass[_O], 

942 inspected: _InternalEntityType[_O], 

943 selectable: FromClause, 

944 name: Optional[str], 

945 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

946 polymorphic_on: Optional[ColumnElement[Any]], 

947 _base_alias: Optional[AliasedInsp[Any]], 

948 _use_mapper_path: bool, 

949 adapt_on_names: bool, 

950 represents_outer_join: bool, 

951 nest_adapters: bool, 

952 ): 

953 mapped_class_or_ac = inspected.entity 

954 mapper = inspected.mapper 

955 

956 self._weak_entity = weakref.ref(entity) 

957 self.mapper = mapper 

958 self.selectable = self.persist_selectable = self.local_table = ( 

959 selectable 

960 ) 

961 self.name = name 

962 self.polymorphic_on = polymorphic_on 

963 self._base_alias = weakref.ref(_base_alias or self) 

964 self._use_mapper_path = _use_mapper_path 

965 self.represents_outer_join = represents_outer_join 

966 self._nest_adapters = nest_adapters 

967 

968 if with_polymorphic_mappers: 

969 self._is_with_polymorphic = True 

970 self.with_polymorphic_mappers = with_polymorphic_mappers 

971 self._with_polymorphic_entities = [] 

972 for poly in self.with_polymorphic_mappers: 

973 if poly is not mapper: 

974 ent = AliasedClass( 

975 poly.class_, 

976 selectable, 

977 base_alias=self, 

978 adapt_on_names=adapt_on_names, 

979 use_mapper_path=_use_mapper_path, 

980 ) 

981 

982 setattr(self.entity, poly.class_.__name__, ent) 

983 self._with_polymorphic_entities.append(ent._aliased_insp) 

984 

985 else: 

986 self._is_with_polymorphic = False 

987 self.with_polymorphic_mappers = [mapper] 

988 

989 self._adapter = ORMAdapter( 

990 _TraceAdaptRole.ALIASED_INSP, 

991 mapper, 

992 selectable=selectable, 

993 equivalents=mapper._equivalent_columns, 

994 adapt_on_names=adapt_on_names, 

995 anonymize_labels=True, 

996 # make sure the adapter doesn't try to grab other tables that 

997 # are not even the thing we are mapping, such as embedded 

998 # selectables in subqueries or CTEs. See issue #6060 

999 adapt_from_selectables={ 

1000 m.selectable 

1001 for m in self.with_polymorphic_mappers 

1002 if not adapt_on_names 

1003 }, 

1004 limit_on_entity=False, 

1005 ) 

1006 

1007 if nest_adapters: 

1008 # supports "aliased class of aliased class" use case 

1009 assert isinstance(inspected, AliasedInsp) 

1010 self._adapter = inspected._adapter.wrap(self._adapter) 

1011 

1012 self._adapt_on_names = adapt_on_names 

1013 self._target = mapped_class_or_ac 

1014 

1015 @property 

1016 def _post_inspect(self): # type: ignore[override] 

1017 self.mapper._check_configure() 

1018 

1019 @classmethod 

1020 def _alias_factory( 

1021 cls, 

1022 element: Union[_EntityType[_O], FromClause], 

1023 alias: Optional[FromClause] = None, 

1024 name: Optional[str] = None, 

1025 flat: bool = False, 

1026 adapt_on_names: bool = False, 

1027 ) -> Union[AliasedClass[_O], FromClause]: 

1028 if isinstance(element, FromClause): 

1029 if adapt_on_names: 

1030 raise sa_exc.ArgumentError( 

1031 "adapt_on_names only applies to ORM elements" 

1032 ) 

1033 if name: 

1034 return element.alias(name=name, flat=flat) 

1035 else: 

1036 # see selectable.py->Alias._factory() for similar 

1037 # mypy issue. Cannot get the overload to see this 

1038 # in mypy (works fine in pyright) 

1039 return coercions.expect( # type: ignore[no-any-return] 

1040 roles.AnonymizedFromClauseRole, element, flat=flat 

1041 ) 

1042 else: 

1043 return AliasedClass( 

1044 element, 

1045 alias=alias, 

1046 flat=flat, 

1047 name=name, 

1048 adapt_on_names=adapt_on_names, 

1049 ) 

1050 

1051 @classmethod 

1052 def _with_polymorphic_factory( 

1053 cls, 

1054 base: Union[Type[_O], Mapper[_O]], 

1055 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1056 selectable: Union[Literal[False, None], FromClause] = False, 

1057 flat: bool = False, 

1058 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1059 aliased: bool = False, 

1060 innerjoin: bool = False, 

1061 adapt_on_names: bool = False, 

1062 name: Optional[str] = None, 

1063 _use_mapper_path: bool = False, 

1064 ) -> AliasedClass[_O]: 

1065 primary_mapper = _class_to_mapper(base) 

1066 

1067 if selectable not in (None, False) and flat: 

1068 raise sa_exc.ArgumentError( 

1069 "the 'flat' and 'selectable' arguments cannot be passed " 

1070 "simultaneously to with_polymorphic()" 

1071 ) 

1072 

1073 mappers, selectable = primary_mapper._with_polymorphic_args( 

1074 classes, selectable, innerjoin=innerjoin 

1075 ) 

1076 if aliased or flat: 

1077 assert selectable is not None 

1078 selectable = selectable._anonymous_fromclause(flat=flat) 

1079 

1080 return AliasedClass( 

1081 base, 

1082 selectable, 

1083 name=name, 

1084 with_polymorphic_mappers=mappers, 

1085 adapt_on_names=adapt_on_names, 

1086 with_polymorphic_discriminator=polymorphic_on, 

1087 use_mapper_path=_use_mapper_path, 

1088 represents_outer_join=not innerjoin, 

1089 ) 

1090 

1091 @property 

1092 def entity(self) -> AliasedClass[_O]: 

1093 # to eliminate reference cycles, the AliasedClass is held weakly. 

1094 # this produces some situations where the AliasedClass gets lost, 

1095 # particularly when one is created internally and only the AliasedInsp 

1096 # is passed around. 

1097 # to work around this case, we just generate a new one when we need 

1098 # it, as it is a simple class with very little initial state on it. 

1099 ent = self._weak_entity() 

1100 if ent is None: 

1101 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1102 self._weak_entity = weakref.ref(ent) 

1103 return ent 

1104 

1105 is_aliased_class = True 

1106 "always returns True" 

1107 

1108 def _memoized_method___clause_element__(self) -> FromClause: 

1109 return self.selectable._annotate( 

1110 { 

1111 "parentmapper": self.mapper, 

1112 "parententity": self, 

1113 "entity_namespace": self, 

1114 } 

1115 )._set_propagate_attrs( 

1116 {"compile_state_plugin": "orm", "plugin_subject": self} 

1117 ) 

1118 

1119 @property 

1120 def entity_namespace(self) -> AliasedClass[_O]: 

1121 return self.entity 

1122 

1123 @property 

1124 def class_(self) -> Type[_O]: 

1125 """Return the mapped class ultimately represented by this 

1126 :class:`.AliasedInsp`.""" 

1127 return self.mapper.class_ 

1128 

1129 @property 

1130 def _path_registry(self) -> _AbstractEntityRegistry: 

1131 if self._use_mapper_path: 

1132 return self.mapper._path_registry 

1133 else: 

1134 return PathRegistry.per_mapper(self) 

1135 

1136 def __getstate__(self) -> Dict[str, Any]: 

1137 return { 

1138 "entity": self.entity, 

1139 "mapper": self.mapper, 

1140 "alias": self.selectable, 

1141 "name": self.name, 

1142 "adapt_on_names": self._adapt_on_names, 

1143 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1144 "with_polymorphic_discriminator": self.polymorphic_on, 

1145 "base_alias": self._base_alias(), 

1146 "use_mapper_path": self._use_mapper_path, 

1147 "represents_outer_join": self.represents_outer_join, 

1148 "nest_adapters": self._nest_adapters, 

1149 } 

1150 

1151 def __setstate__(self, state: Dict[str, Any]) -> None: 

1152 self.__init__( # type: ignore 

1153 state["entity"], 

1154 state["mapper"], 

1155 state["alias"], 

1156 state["name"], 

1157 state["with_polymorphic_mappers"], 

1158 state["with_polymorphic_discriminator"], 

1159 state["base_alias"], 

1160 state["use_mapper_path"], 

1161 state["adapt_on_names"], 

1162 state["represents_outer_join"], 

1163 state["nest_adapters"], 

1164 ) 

1165 

1166 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1167 # assert self._is_with_polymorphic 

1168 # assert other._is_with_polymorphic 

1169 

1170 primary_mapper = other.mapper 

1171 

1172 assert self.mapper is primary_mapper 

1173 

1174 our_classes = util.to_set( 

1175 mp.class_ for mp in self.with_polymorphic_mappers 

1176 ) 

1177 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1178 if our_classes == new_classes: 

1179 return other 

1180 else: 

1181 classes = our_classes.union(new_classes) 

1182 

1183 mappers, selectable = primary_mapper._with_polymorphic_args( 

1184 classes, None, innerjoin=not other.represents_outer_join 

1185 ) 

1186 selectable = selectable._anonymous_fromclause(flat=True) 

1187 return AliasedClass( 

1188 primary_mapper, 

1189 selectable, 

1190 with_polymorphic_mappers=mappers, 

1191 with_polymorphic_discriminator=other.polymorphic_on, 

1192 use_mapper_path=other._use_mapper_path, 

1193 represents_outer_join=other.represents_outer_join, 

1194 )._aliased_insp 

1195 

1196 def _adapt_element( 

1197 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1198 ) -> _ORMCOLEXPR: 

1199 assert isinstance(expr, ColumnElement) 

1200 d: Dict[str, Any] = { 

1201 "parententity": self, 

1202 "parentmapper": self.mapper, 

1203 } 

1204 if key: 

1205 d["proxy_key"] = key 

1206 

1207 # userspace adapt of an attribute from AliasedClass; validate that 

1208 # it actually was present 

1209 adapted = self._adapter.adapt_check_present(expr) 

1210 if adapted is None: 

1211 adapted = expr 

1212 if self._adapter.adapt_on_names: 

1213 util.warn_limited( 

1214 "Did not locate an expression in selectable for " 

1215 "attribute %r; ensure name is correct in expression", 

1216 (key,), 

1217 ) 

1218 else: 

1219 util.warn_limited( 

1220 "Did not locate an expression in selectable for " 

1221 "attribute %r; to match by name, use the " 

1222 "adapt_on_names parameter", 

1223 (key,), 

1224 ) 

1225 

1226 return adapted._annotate(d)._set_propagate_attrs( 

1227 {"compile_state_plugin": "orm", "plugin_subject": self} 

1228 ) 

1229 

1230 if TYPE_CHECKING: 

1231 # establish compatibility with the _ORMAdapterProto protocol, 

1232 # which in turn is compatible with _CoreAdapterProto. 

1233 

1234 def _orm_adapt_element( 

1235 self, 

1236 obj: _CE, 

1237 key: Optional[str] = None, 

1238 ) -> _CE: ... 

1239 

1240 else: 

1241 _orm_adapt_element = _adapt_element 

1242 

1243 def _entity_for_mapper(self, mapper): 

1244 self_poly = self.with_polymorphic_mappers 

1245 if mapper in self_poly: 

1246 if mapper is self.mapper: 

1247 return self 

1248 else: 

1249 return getattr( 

1250 self.entity, mapper.class_.__name__ 

1251 )._aliased_insp 

1252 elif mapper.isa(self.mapper): 

1253 return self 

1254 else: 

1255 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1256 

1257 def _memoized_attr__get_clause(self): 

1258 onclause, replacemap = self.mapper._get_clause 

1259 return ( 

1260 self._adapter.traverse(onclause), 

1261 { 

1262 self._adapter.traverse(col): param 

1263 for col, param in replacemap.items() 

1264 }, 

1265 ) 

1266 

1267 def _memoized_attr__memoized_values(self): 

1268 return {} 

1269 

1270 def _memoized_attr__all_column_expressions(self): 

1271 if self._is_with_polymorphic: 

1272 cols_plus_keys = self.mapper._columns_plus_keys( 

1273 [ent.mapper for ent in self._with_polymorphic_entities] 

1274 ) 

1275 else: 

1276 cols_plus_keys = self.mapper._columns_plus_keys() 

1277 

1278 cols_plus_keys = [ 

1279 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1280 ] 

1281 

1282 return WriteableColumnCollection(cols_plus_keys) 

1283 

1284 def _memo(self, key, callable_, *args, **kw): 

1285 if key in self._memoized_values: 

1286 return self._memoized_values[key] 

1287 else: 

1288 self._memoized_values[key] = value = callable_(*args, **kw) 

1289 return value 

1290 

1291 def __repr__(self): 

1292 if self.with_polymorphic_mappers: 

1293 with_poly = "(%s)" % ", ".join( 

1294 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1295 ) 

1296 else: 

1297 with_poly = "" 

1298 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1299 id(self), 

1300 self.class_.__name__, 

1301 with_poly, 

1302 ) 

1303 

1304 def __str__(self): 

1305 if self._is_with_polymorphic: 

1306 return "with_polymorphic(%s, [%s])" % ( 

1307 self._target.__name__, 

1308 ", ".join( 

1309 mp.class_.__name__ 

1310 for mp in self.with_polymorphic_mappers 

1311 if mp is not self.mapper 

1312 ), 

1313 ) 

1314 else: 

1315 return "aliased(%s)" % (self._target.__name__,) 

1316 

1317 

1318class _WrapUserEntity: 

1319 """A wrapper used within the loader_criteria lambda caller so that 

1320 we can bypass declared_attr descriptors on unmapped mixins, which 

1321 normally emit a warning for such use. 

1322 

1323 might also be useful for other per-lambda instrumentations should 

1324 the need arise. 

1325 

1326 """ 

1327 

1328 __slots__ = ("subject",) 

1329 

1330 def __init__(self, subject): 

1331 self.subject = subject 

1332 

1333 @util.preload_module("sqlalchemy.orm.decl_api") 

1334 def __getattribute__(self, name): 

1335 decl_api = util.preloaded.orm.decl_api 

1336 

1337 subject = object.__getattribute__(self, "subject") 

1338 if name in subject.__dict__ and isinstance( 

1339 subject.__dict__[name], decl_api.declared_attr 

1340 ): 

1341 return subject.__dict__[name].fget(subject) 

1342 else: 

1343 return getattr(subject, name) 

1344 

1345 

1346class LoaderCriteriaOption(CriteriaOption): 

1347 """Add additional WHERE criteria to the load for all occurrences of 

1348 a particular entity. 

1349 

1350 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1351 :func:`_orm.with_loader_criteria` function; see that function for 

1352 details. 

1353 

1354 .. versionadded:: 1.4 

1355 

1356 """ 

1357 

1358 __slots__ = ( 

1359 "root_entity", 

1360 "entity", 

1361 "deferred_where_criteria", 

1362 "where_criteria", 

1363 "_where_crit_orig", 

1364 "include_aliases", 

1365 "propagate_to_loaders", 

1366 ) 

1367 

1368 _traverse_internals = [ 

1369 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1370 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1371 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1372 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1373 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1374 ] 

1375 

1376 root_entity: Optional[Type[Any]] 

1377 entity: Optional[_InternalEntityType[Any]] 

1378 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1379 deferred_where_criteria: bool 

1380 include_aliases: bool 

1381 propagate_to_loaders: bool 

1382 

1383 _where_crit_orig: Any 

1384 

1385 def __init__( 

1386 self, 

1387 entity_or_base: _EntityType[Any], 

1388 where_criteria: Union[ 

1389 _ColumnExpressionArgument[bool], 

1390 Callable[[Any], _ColumnExpressionArgument[bool]], 

1391 ], 

1392 loader_only: bool = False, 

1393 include_aliases: bool = False, 

1394 propagate_to_loaders: bool = True, 

1395 track_closure_variables: bool = True, 

1396 ): 

1397 entity = cast( 

1398 "_InternalEntityType[Any]", 

1399 inspection.inspect(entity_or_base, False), 

1400 ) 

1401 if entity is None: 

1402 self.root_entity = cast("Type[Any]", entity_or_base) 

1403 self.entity = None 

1404 else: 

1405 self.root_entity = None 

1406 self.entity = entity 

1407 

1408 self._where_crit_orig = where_criteria 

1409 if callable(where_criteria): 

1410 if self.root_entity is not None: 

1411 wrap_entity = self.root_entity 

1412 else: 

1413 assert entity is not None 

1414 wrap_entity = entity.entity 

1415 

1416 self.deferred_where_criteria = True 

1417 self.where_criteria = lambdas.DeferredLambdaElement( 

1418 where_criteria, 

1419 roles.WhereHavingRole, 

1420 lambda_args=(_WrapUserEntity(wrap_entity),), 

1421 opts=lambdas.LambdaOptions( 

1422 track_closure_variables=track_closure_variables 

1423 ), 

1424 ) 

1425 else: 

1426 self.deferred_where_criteria = False 

1427 self.where_criteria = coercions.expect( 

1428 roles.WhereHavingRole, where_criteria 

1429 ) 

1430 

1431 self.include_aliases = include_aliases 

1432 self.propagate_to_loaders = propagate_to_loaders 

1433 

1434 @classmethod 

1435 def _unreduce( 

1436 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1437 ): 

1438 return LoaderCriteriaOption( 

1439 entity, 

1440 where_criteria, 

1441 include_aliases=include_aliases, 

1442 propagate_to_loaders=propagate_to_loaders, 

1443 ) 

1444 

1445 def __reduce__(self): 

1446 return ( 

1447 LoaderCriteriaOption._unreduce, 

1448 ( 

1449 self.entity.class_ if self.entity else self.root_entity, 

1450 self._where_crit_orig, 

1451 self.include_aliases, 

1452 self.propagate_to_loaders, 

1453 ), 

1454 ) 

1455 

1456 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1457 if self.entity: 

1458 yield from self.entity.mapper.self_and_descendants 

1459 else: 

1460 assert self.root_entity 

1461 stack = list(self.root_entity.__subclasses__()) 

1462 while stack: 

1463 subclass = stack.pop(0) 

1464 ent = cast( 

1465 "_InternalEntityType[Any]", 

1466 inspection.inspect(subclass, raiseerr=False), 

1467 ) 

1468 if ent: 

1469 yield from ent.mapper.self_and_descendants 

1470 else: 

1471 stack.extend(subclass.__subclasses__()) 

1472 

1473 def _should_include(self, compile_state: _ORMCompileState) -> bool: 

1474 if ( 

1475 compile_state.select_statement._annotations.get( 

1476 "for_loader_criteria", None 

1477 ) 

1478 is self 

1479 ): 

1480 return False 

1481 return True 

1482 

1483 def _resolve_where_criteria( 

1484 self, ext_info: _InternalEntityType[Any] 

1485 ) -> ColumnElement[bool]: 

1486 if self.deferred_where_criteria: 

1487 crit = cast( 

1488 "ColumnElement[bool]", 

1489 self.where_criteria._resolve_with_args(ext_info.entity), 

1490 ) 

1491 else: 

1492 crit = self.where_criteria # type: ignore 

1493 assert isinstance(crit, ColumnElement) 

1494 return sql_util._deep_annotate( 

1495 crit, 

1496 {"for_loader_criteria": self}, 

1497 detect_subquery_cols=True, 

1498 ind_cols_on_fromclause=True, 

1499 ) 

1500 

1501 def process_compile_state_replaced_entities( 

1502 self, 

1503 compile_state: _ORMCompileState, 

1504 mapper_entities: Iterable[_MapperEntity], 

1505 ) -> None: 

1506 self.process_compile_state(compile_state) 

1507 

1508 def process_compile_state(self, compile_state: _ORMCompileState) -> None: 

1509 """Apply a modification to a given :class:`.CompileState`.""" 

1510 

1511 # if options to limit the criteria to immediate query only, 

1512 # use compile_state.attributes instead 

1513 

1514 self.get_global_criteria(compile_state.global_attributes) 

1515 

1516 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1517 for mp in self._all_mappers(): 

1518 load_criteria = attributes.setdefault( 

1519 ("additional_entity_criteria", mp), [] 

1520 ) 

1521 

1522 load_criteria.append(self) 

1523 

1524 

1525inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1526 

1527 

1528@inspection._inspects(type) 

1529def _inspect_mc( 

1530 class_: Type[_O], 

1531) -> Optional[Mapper[_O]]: 

1532 try: 

1533 class_manager = opt_manager_of_class(class_) 

1534 if class_manager is None or not class_manager.is_mapped: 

1535 return None 

1536 mapper = class_manager.mapper 

1537 except orm_exc.NO_STATE: 

1538 return None 

1539 else: 

1540 return mapper 

1541 

1542 

1543GenericAlias = type(List[Any]) 

1544 

1545 

1546@inspection._inspects(GenericAlias) 

1547def _inspect_generic_alias( 

1548 class_: Type[_O], 

1549) -> Optional[Mapper[_O]]: 

1550 origin = cast("Type[_O]", get_origin(class_)) 

1551 return _inspect_mc(origin) 

1552 

1553 

1554@inspection._self_inspects 

1555class Bundle( 

1556 ORMColumnsClauseRole[_T], 

1557 SupportsCloneAnnotations, 

1558 MemoizedHasCacheKey, 

1559 inspection.Inspectable["Bundle[_T]"], 

1560 InspectionAttr, 

1561): 

1562 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1563 under one namespace. 

1564 

1565 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1566 results returned by a column-oriented :class:`_query.Query` object. 

1567 It also 

1568 is extensible via simple subclassing, where the primary capability 

1569 to override is that of how the set of expressions should be returned, 

1570 allowing post-processing as well as custom return types, without 

1571 involving ORM identity-mapped classes. 

1572 

1573 .. seealso:: 

1574 

1575 :ref:`bundles` 

1576 

1577 :class:`.DictBundle` 

1578 

1579 """ 

1580 

1581 single_entity = False 

1582 """If True, queries for a single Bundle will be returned as a single 

1583 entity, rather than an element within a keyed tuple.""" 

1584 

1585 is_clause_element = False 

1586 

1587 is_mapper = False 

1588 

1589 is_aliased_class = False 

1590 

1591 is_bundle = True 

1592 

1593 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1594 

1595 proxy_set = util.EMPTY_SET 

1596 

1597 exprs: List[_ColumnsClauseElement] 

1598 

1599 def __init__( 

1600 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1601 ) -> None: 

1602 r"""Construct a new :class:`.Bundle`. 

1603 

1604 e.g.:: 

1605 

1606 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1607 

1608 for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4): 

1609 print(row.mybundle.x, row.mybundle.y) 

1610 

1611 :param name: name of the bundle. 

1612 :param \*exprs: columns or SQL expressions comprising the bundle. 

1613 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1614 can be returned as a "single entity" outside of any enclosing tuple 

1615 in the same manner as a mapped entity. 

1616 

1617 """ # noqa: E501 

1618 self.name = self._label = name 

1619 coerced_exprs = [ 

1620 coercions.expect( 

1621 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1622 ) 

1623 for expr in exprs 

1624 ] 

1625 self.exprs = coerced_exprs 

1626 

1627 self.c = self.columns = WriteableColumnCollection( 

1628 (getattr(col, "key", col._label), col) 

1629 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1630 ).as_readonly() 

1631 self.single_entity = kw.pop("single_entity", self.single_entity) 

1632 

1633 def _gen_cache_key( 

1634 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1635 ) -> Tuple[Any, ...]: 

1636 return (self.__class__, self.name, self.single_entity) + tuple( 

1637 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1638 ) 

1639 

1640 @property 

1641 def mapper(self) -> Optional[Mapper[Any]]: 

1642 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1643 "parentmapper", None 

1644 ) 

1645 return mp 

1646 

1647 @property 

1648 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1649 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1650 0 

1651 ]._annotations.get("parententity", None) 

1652 return ie 

1653 

1654 @property 

1655 def entity_namespace( 

1656 self, 

1657 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1658 return self.c 

1659 

1660 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1661 

1662 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1663 

1664 e.g.:: 

1665 

1666 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1667 

1668 q = sess.query(bn).filter(bn.c.x == 5) 

1669 

1670 Nesting of bundles is also supported:: 

1671 

1672 b1 = Bundle( 

1673 "b1", 

1674 Bundle("b2", MyClass.a, MyClass.b), 

1675 Bundle("b3", MyClass.x, MyClass.y), 

1676 ) 

1677 

1678 q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1679 

1680 .. seealso:: 

1681 

1682 :attr:`.Bundle.c` 

1683 

1684 """ # noqa: E501 

1685 

1686 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1687 """An alias for :attr:`.Bundle.columns`.""" 

1688 

1689 def _clone(self, **kw): 

1690 cloned = self.__class__.__new__(self.__class__) 

1691 cloned.__dict__.update(self.__dict__) 

1692 return cloned 

1693 

1694 def __clause_element__(self): 

1695 # ensure existing entity_namespace remains 

1696 annotations = {"bundle": self, "entity_namespace": self} 

1697 annotations.update(self._annotations) 

1698 

1699 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1700 "plugin_subject", self.entity 

1701 ) 

1702 return ( 

1703 expression.ClauseList( 

1704 _literal_as_text_role=roles.ColumnsClauseRole, 

1705 group=False, 

1706 *[e._annotations.get("bundle", e) for e in self.exprs], 

1707 ) 

1708 ._annotate(annotations) 

1709 ._set_propagate_attrs( 

1710 # the Bundle *must* use the orm plugin no matter what. the 

1711 # subject can be None but it's much better if it's not. 

1712 { 

1713 "compile_state_plugin": "orm", 

1714 "plugin_subject": plugin_subject, 

1715 } 

1716 ) 

1717 ) 

1718 

1719 @property 

1720 def clauses(self): 

1721 return self.__clause_element__().clauses 

1722 

1723 def label(self, name): 

1724 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1725 

1726 cloned = self._clone() 

1727 cloned.name = name 

1728 return cloned 

1729 

1730 def create_row_processor( 

1731 self, 

1732 query: Select[Unpack[TupleAny]], 

1733 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1734 labels: Sequence[str], 

1735 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1736 """Produce the "row processing" function for this :class:`.Bundle`. 

1737 

1738 May be overridden by subclasses to provide custom behaviors when 

1739 results are fetched. The method is passed the statement object and a 

1740 set of "row processor" functions at query execution time; these 

1741 processor functions when given a result row will return the individual 

1742 attribute value, which can then be adapted into any kind of return data 

1743 structure. 

1744 

1745 The example below illustrates replacing the usual :class:`.Row` 

1746 return structure with a straight Python dictionary:: 

1747 

1748 from sqlalchemy.orm import Bundle 

1749 

1750 

1751 class DictBundle(Bundle): 

1752 def create_row_processor(self, query, procs, labels): 

1753 "Override create_row_processor to return values as dictionaries" 

1754 

1755 def proc(row): 

1756 return dict(zip(labels, (proc(row) for proc in procs))) 

1757 

1758 return proc 

1759 

1760 A result from the above :class:`_orm.Bundle` will return dictionary 

1761 values:: 

1762 

1763 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1764 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1765 print(row.mybundle["data1"], row.mybundle["data2"]) 

1766 

1767 The above example is available natively using :class:`.DictBundle` 

1768 

1769 .. seealso:: 

1770 

1771 :class:`.DictBundle` 

1772 

1773 """ # noqa: E501 

1774 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1775 

1776 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1777 return keyed_tuple([proc(row) for proc in procs]) 

1778 

1779 return proc 

1780 

1781 

1782class DictBundle(Bundle[_T]): 

1783 """Like :class:`.Bundle` but returns ``dict`` instances instead of 

1784 named tuple like objects:: 

1785 

1786 bn = DictBundle("mybundle", MyClass.data1, MyClass.data2) 

1787 for row in session.execute(select(bn)).where(bn.c.data1 == "d1"): 

1788 print(row.mybundle["data1"], row.mybundle["data2"]) 

1789 

1790 Differently from :class:`.Bundle`, multiple columns with the same name are 

1791 not supported. 

1792 

1793 .. versionadded:: 2.1 

1794 

1795 .. seealso:: 

1796 

1797 :ref:`bundles` 

1798 

1799 :class:`.Bundle` 

1800 """ 

1801 

1802 def __init__( 

1803 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1804 ) -> None: 

1805 super().__init__(name, *exprs, **kw) 

1806 if len(set(self.c.keys())) != len(self.c): 

1807 raise sa_exc.ArgumentError( 

1808 "DictBundle does not support duplicate column names" 

1809 ) 

1810 

1811 def create_row_processor( 

1812 self, 

1813 query: Select[Unpack[TupleAny]], 

1814 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1815 labels: Sequence[str], 

1816 ) -> Callable[[Row[Unpack[TupleAny]]], dict[str, Any]]: 

1817 def proc(row: Row[Unpack[TupleAny]]) -> dict[str, Any]: 

1818 return dict(zip(labels, (proc(row) for proc in procs))) 

1819 

1820 return proc 

1821 

1822 

1823def _orm_full_deannotate(element: _SA) -> _SA: 

1824 return sql_util._deep_deannotate(element) 

1825 

1826 

1827class _ORMJoin(expression.Join): 

1828 """Extend Join to support ORM constructs as input.""" 

1829 

1830 __visit_name__ = expression.Join.__visit_name__ 

1831 

1832 inherit_cache = True 

1833 

1834 def __init__( 

1835 self, 

1836 left: _FromClauseArgument, 

1837 right: _FromClauseArgument, 

1838 onclause: Optional[_OnClauseArgument] = None, 

1839 isouter: bool = False, 

1840 full: bool = False, 

1841 _left_memo: Optional[Any] = None, 

1842 _right_memo: Optional[Any] = None, 

1843 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1844 ): 

1845 left_info = cast( 

1846 "Union[FromClause, _InternalEntityType[Any]]", 

1847 inspection.inspect(left), 

1848 ) 

1849 

1850 right_info = cast( 

1851 "Union[FromClause, _InternalEntityType[Any]]", 

1852 inspection.inspect(right), 

1853 ) 

1854 adapt_to = right_info.selectable 

1855 

1856 # used by joined eager loader 

1857 self._left_memo = _left_memo 

1858 self._right_memo = _right_memo 

1859 

1860 if isinstance(onclause, attributes.QueryableAttribute): 

1861 if TYPE_CHECKING: 

1862 assert isinstance( 

1863 onclause.comparator, RelationshipProperty.Comparator 

1864 ) 

1865 on_selectable = onclause.comparator._source_selectable() 

1866 prop = onclause.property 

1867 _extra_criteria += onclause._extra_criteria 

1868 elif isinstance(onclause, MapperProperty): 

1869 # used internally by joined eager loader...possibly not ideal 

1870 prop = onclause 

1871 on_selectable = prop.parent.selectable 

1872 else: 

1873 prop = None 

1874 on_selectable = None 

1875 

1876 left_selectable = left_info.selectable 

1877 if prop: 

1878 adapt_from: Optional[FromClause] 

1879 if sql_util.clause_is_present(on_selectable, left_selectable): 

1880 adapt_from = on_selectable 

1881 else: 

1882 assert isinstance(left_selectable, FromClause) 

1883 adapt_from = left_selectable 

1884 

1885 ( 

1886 pj, 

1887 sj, 

1888 source, 

1889 dest, 

1890 secondary, 

1891 target_adapter, 

1892 ) = prop._create_joins( 

1893 source_selectable=adapt_from, 

1894 dest_selectable=adapt_to, 

1895 source_polymorphic=True, 

1896 of_type_entity=right_info, 

1897 alias_secondary=True, 

1898 extra_criteria=_extra_criteria, 

1899 ) 

1900 

1901 if sj is not None: 

1902 if isouter: 

1903 # note this is an inner join from secondary->right 

1904 right = sql.join(secondary, right, sj) 

1905 onclause = pj 

1906 else: 

1907 left = sql.join(left, secondary, pj, isouter) 

1908 onclause = sj 

1909 else: 

1910 onclause = pj 

1911 

1912 self._target_adapter = target_adapter 

1913 

1914 # we don't use the normal coercions logic for _ORMJoin 

1915 # (probably should), so do some gymnastics to get the entity. 

1916 # logic here is for #8721, which was a major bug in 1.4 

1917 # for almost two years, not reported/fixed until 1.4.43 (!) 

1918 if is_selectable(left_info): 

1919 parententity = left_selectable._annotations.get( 

1920 "parententity", None 

1921 ) 

1922 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1923 parententity = left_info 

1924 else: 

1925 parententity = None 

1926 

1927 if parententity is not None: 

1928 self._annotations = self._annotations.union( 

1929 {"parententity": parententity} 

1930 ) 

1931 

1932 augment_onclause = bool(_extra_criteria) and not prop 

1933 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1934 

1935 assert self.onclause is not None 

1936 

1937 if augment_onclause: 

1938 self.onclause &= sql.and_(*_extra_criteria) 

1939 

1940 if ( 

1941 not prop 

1942 and getattr(right_info, "mapper", None) 

1943 and right_info.mapper.single # type: ignore 

1944 ): 

1945 right_info = cast("_InternalEntityType[Any]", right_info) 

1946 # if single inheritance target and we are using a manual 

1947 # or implicit ON clause, augment it the same way we'd augment the 

1948 # WHERE. 

1949 single_crit = right_info.mapper._single_table_criterion 

1950 if single_crit is not None: 

1951 if insp_is_aliased_class(right_info): 

1952 single_crit = right_info._adapter.traverse(single_crit) 

1953 self.onclause = self.onclause & single_crit 

1954 

1955 def _splice_into_center(self, other): 

1956 """Splice a join into the center. 

1957 

1958 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1959 

1960 """ 

1961 leftmost = other 

1962 while isinstance(leftmost, sql.Join): 

1963 leftmost = leftmost.left 

1964 

1965 assert self.right is leftmost 

1966 

1967 left = _ORMJoin( 

1968 self.left, 

1969 other.left, 

1970 self.onclause, 

1971 isouter=self.isouter, 

1972 _left_memo=self._left_memo, 

1973 _right_memo=other._left_memo._path_registry, 

1974 ) 

1975 

1976 return _ORMJoin( 

1977 left, 

1978 other.right, 

1979 other.onclause, 

1980 isouter=other.isouter, 

1981 _right_memo=other._right_memo, 

1982 ) 

1983 

1984 def join( 

1985 self, 

1986 right: _FromClauseArgument, 

1987 onclause: Optional[_OnClauseArgument] = None, 

1988 isouter: bool = False, 

1989 full: bool = False, 

1990 ) -> _ORMJoin: 

1991 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1992 

1993 def outerjoin( 

1994 self, 

1995 right: _FromClauseArgument, 

1996 onclause: Optional[_OnClauseArgument] = None, 

1997 full: bool = False, 

1998 ) -> _ORMJoin: 

1999 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

2000 

2001 

2002def with_parent( 

2003 instance: object, 

2004 prop: attributes.QueryableAttribute[Any], 

2005 from_entity: Optional[_EntityType[Any]] = None, 

2006) -> ColumnElement[bool]: 

2007 """Create filtering criterion that relates this query's primary entity 

2008 to the given related instance, using established 

2009 :func:`_orm.relationship()` 

2010 configuration. 

2011 

2012 E.g.:: 

2013 

2014 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

2015 

2016 The SQL rendered is the same as that rendered when a lazy loader 

2017 would fire off from the given parent on that attribute, meaning 

2018 that the appropriate state is taken from the parent object in 

2019 Python without the need to render joins to the parent table 

2020 in the rendered statement. 

2021 

2022 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

2023 to indicate the left side of the criteria:: 

2024 

2025 

2026 a1 = aliased(Address) 

2027 a2 = aliased(Address) 

2028 stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2))) 

2029 

2030 The above use is equivalent to using the 

2031 :func:`_orm.with_parent.from_entity` argument:: 

2032 

2033 a1 = aliased(Address) 

2034 a2 = aliased(Address) 

2035 stmt = select(a1, a2).where( 

2036 with_parent(u1, User.addresses, from_entity=a2) 

2037 ) 

2038 

2039 :param instance: 

2040 An instance which has some :func:`_orm.relationship`. 

2041 

2042 :param property: 

2043 Class-bound attribute, which indicates 

2044 what relationship from the instance should be used to reconcile the 

2045 parent/child relationship. 

2046 

2047 :param from_entity: 

2048 Entity in which to consider as the left side. This defaults to the 

2049 "zero" entity of the :class:`_query.Query` itself. 

2050 

2051 """ # noqa: E501 

2052 prop_t: RelationshipProperty[Any] 

2053 

2054 if isinstance(prop, str): 

2055 raise sa_exc.ArgumentError( 

2056 "with_parent() accepts class-bound mapped attributes, not strings" 

2057 ) 

2058 elif isinstance(prop, attributes.QueryableAttribute): 

2059 if prop._of_type: 

2060 from_entity = prop._of_type 

2061 mapper_property = prop.property 

2062 if mapper_property is None or not prop_is_relationship( 

2063 mapper_property 

2064 ): 

2065 raise sa_exc.ArgumentError( 

2066 f"Expected relationship property for with_parent(), " 

2067 f"got {mapper_property}" 

2068 ) 

2069 prop_t = mapper_property 

2070 else: 

2071 prop_t = prop 

2072 

2073 return prop_t._with_parent(instance, from_entity=from_entity) 

2074 

2075 

2076def has_identity(object_: object) -> bool: 

2077 """Return True if the given object has a database 

2078 identity. 

2079 

2080 This typically corresponds to the object being 

2081 in either the persistent or detached state. 

2082 

2083 .. seealso:: 

2084 

2085 :func:`.was_deleted` 

2086 

2087 """ 

2088 state = attributes.instance_state(object_) 

2089 return state.has_identity 

2090 

2091 

2092def was_deleted(object_: object) -> bool: 

2093 """Return True if the given object was deleted 

2094 within a session flush. 

2095 

2096 This is regardless of whether or not the object is 

2097 persistent or detached. 

2098 

2099 .. seealso:: 

2100 

2101 :attr:`.InstanceState.was_deleted` 

2102 

2103 """ 

2104 

2105 state = attributes.instance_state(object_) 

2106 return state.was_deleted 

2107 

2108 

2109def _entity_corresponds_to( 

2110 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2111) -> bool: 

2112 """determine if 'given' corresponds to 'entity', in terms 

2113 of an entity passed to Query that would match the same entity 

2114 being referred to elsewhere in the query. 

2115 

2116 """ 

2117 if insp_is_aliased_class(entity): 

2118 if insp_is_aliased_class(given): 

2119 if entity._base_alias() is given._base_alias(): 

2120 return True 

2121 return False 

2122 elif insp_is_aliased_class(given): 

2123 if given._use_mapper_path: 

2124 return entity in given.with_polymorphic_mappers 

2125 else: 

2126 return entity is given 

2127 

2128 assert insp_is_mapper(given) 

2129 return entity.common_parent(given) 

2130 

2131 

2132def _entity_corresponds_to_use_path_impl( 

2133 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2134) -> bool: 

2135 """determine if 'given' corresponds to 'entity', in terms 

2136 of a path of loader options where a mapped attribute is taken to 

2137 be a member of a parent entity. 

2138 

2139 e.g.:: 

2140 

2141 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2142 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2143 

2144 a1 = aliased(A) 

2145 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2146 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2147 

2148 wp = with_polymorphic(A, [A1, A2]) 

2149 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2150 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2151 

2152 """ 

2153 if insp_is_aliased_class(given): 

2154 return ( 

2155 insp_is_aliased_class(entity) 

2156 and not entity._use_mapper_path 

2157 and (given is entity or entity in given._with_polymorphic_entities) 

2158 ) 

2159 elif not insp_is_aliased_class(entity): 

2160 return given.isa(entity.mapper) 

2161 else: 

2162 return ( 

2163 entity._use_mapper_path 

2164 and given in entity.with_polymorphic_mappers 

2165 ) 

2166 

2167 

2168def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2169 """determine if 'given' "is a" mapper, in terms of the given 

2170 would load rows of type 'mapper'. 

2171 

2172 """ 

2173 if given.is_aliased_class: 

2174 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2175 mapper 

2176 ) 

2177 elif given.with_polymorphic_mappers: 

2178 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2179 else: 

2180 return given.isa(mapper) 

2181 

2182 

2183def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2184 """calculate __getitem__ in terms of an iterable query object 

2185 that also has a slice() method. 

2186 

2187 """ 

2188 

2189 def _no_negative_indexes(): 

2190 raise IndexError( 

2191 "negative indexes are not accepted by SQL " 

2192 "index / slice operators" 

2193 ) 

2194 

2195 if isinstance(item, slice): 

2196 start, stop, step = util.decode_slice(item) 

2197 

2198 if ( 

2199 isinstance(stop, int) 

2200 and isinstance(start, int) 

2201 and stop - start <= 0 

2202 ): 

2203 return [] 

2204 

2205 elif (isinstance(start, int) and start < 0) or ( 

2206 isinstance(stop, int) and stop < 0 

2207 ): 

2208 _no_negative_indexes() 

2209 

2210 res = iterable_query.slice(start, stop) 

2211 if step is not None: 

2212 return list(res)[None : None : item.step] 

2213 else: 

2214 return list(res) 

2215 else: 

2216 if item == -1: 

2217 _no_negative_indexes() 

2218 else: 

2219 return list(iterable_query[item : item + 1])[0] 

2220 

2221 

2222def _is_mapped_annotation( 

2223 raw_annotation: _AnnotationScanType, 

2224 cls: Type[Any], 

2225 originating_cls: Type[Any], 

2226) -> bool: 

2227 try: 

2228 annotated = de_stringify_annotation( 

2229 cls, raw_annotation, originating_cls.__module__ 

2230 ) 

2231 except NameError: 

2232 # in most cases, at least within our own tests, we can raise 

2233 # here, which is more accurate as it prevents us from returning 

2234 # false negatives. However, in the real world, try to avoid getting 

2235 # involved with end-user annotations that have nothing to do with us. 

2236 # see issue #8888 where we bypass using this function in the case 

2237 # that we want to detect an unresolvable Mapped[] type. 

2238 return False 

2239 else: 

2240 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2241 

2242 

2243class _CleanupError(Exception): 

2244 pass 

2245 

2246 

2247def _cleanup_mapped_str_annotation( 

2248 annotation: str, originating_module: str 

2249) -> str: 

2250 # fix up an annotation that comes in as the form: 

2251 # 'Mapped[List[Address]]' so that it instead looks like: 

2252 # 'Mapped[List["Address"]]' , which will allow us to get 

2253 # "Address" as a string 

2254 

2255 # additionally, resolve symbols for these names since this is where 

2256 # we'd have to do it 

2257 

2258 inner: Optional[Match[str]] 

2259 

2260 mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation) 

2261 

2262 if not mm: 

2263 return annotation 

2264 

2265 # ticket #8759. Resolve the Mapped name to a real symbol. 

2266 # originally this just checked the name. 

2267 try: 

2268 obj = eval_name_only(mm.group(1), originating_module) 

2269 except NameError as ne: 

2270 raise _CleanupError( 

2271 f'For annotation "{annotation}", could not resolve ' 

2272 f'container type "{mm.group(1)}". ' 

2273 "Please ensure this type is imported at the module level " 

2274 "outside of TYPE_CHECKING blocks" 

2275 ) from ne 

2276 

2277 if obj is typing.ClassVar: 

2278 real_symbol = "ClassVar" 

2279 else: 

2280 try: 

2281 if issubclass(obj, _MappedAnnotationBase): 

2282 real_symbol = obj.__name__ 

2283 else: 

2284 return annotation 

2285 except TypeError: 

2286 # avoid isinstance(obj, type) check, just catch TypeError 

2287 return annotation 

2288 

2289 # note: if one of the codepaths above didn't define real_symbol and 

2290 # then didn't return, real_symbol raises UnboundLocalError 

2291 # which is actually a NameError, and the calling routines don't 

2292 # notice this since they are catching NameError anyway. Just in case 

2293 # this is being modified in the future, something to be aware of. 

2294 

2295 stack = [] 

2296 inner = mm 

2297 while True: 

2298 stack.append(real_symbol if mm is inner else inner.group(1)) 

2299 g2 = inner.group(2) 

2300 inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2) 

2301 if inner is None: 

2302 stack.append(g2) 

2303 break 

2304 

2305 # stacks we want to rewrite, that is, quote the last entry which 

2306 # we think is a relationship class name: 

2307 # 

2308 # ['Mapped', 'List', 'Address'] 

2309 # ['Mapped', 'A'] 

2310 # 

2311 # stacks we dont want to rewrite, which are generally MappedColumn 

2312 # use cases: 

2313 # 

2314 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2315 # ['Mapped', 'dict[str, str] | None'] 

2316 

2317 if ( 

2318 # avoid already quoted symbols such as 

2319 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2320 not re.match(r"""^["'].*["']$""", stack[-1]) 

2321 # avoid further generics like Dict[] such as 

2322 # ['Mapped', 'dict[str, str] | None'], 

2323 # ['Mapped', 'list[int] | list[str]'], 

2324 # ['Mapped', 'Union[list[int], list[str]]'], 

2325 and not re.search(r"[\[\]]", stack[-1]) 

2326 ): 

2327 stripchars = "\"' " 

2328 stack[-1] = ", ".join( 

2329 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2330 ) 

2331 

2332 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2333 

2334 return annotation 

2335 

2336 

2337def _extract_mapped_subtype( 

2338 raw_annotation: Optional[_AnnotationScanType], 

2339 cls: type, 

2340 originating_module: str, 

2341 key: str, 

2342 attr_cls: Type[Any], 

2343 required: bool, 

2344 is_dataclass_field: bool, 

2345 expect_mapped: bool = True, 

2346 raiseerr: bool = True, 

2347) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2348 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2349 so, return the ``something`` part. 

2350 

2351 Includes error raise scenarios and other options. 

2352 

2353 """ 

2354 

2355 if raw_annotation is None: 

2356 if required: 

2357 raise orm_exc.MappedAnnotationError( 

2358 f"Python typing annotation is required for attribute " 

2359 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2360 f'"{attr_cls.__name__}" construct are None or not present' 

2361 ) 

2362 return None 

2363 

2364 try: 

2365 # destringify the "outside" of the annotation. note we are not 

2366 # adding include_generic so it will *not* dig into generic contents, 

2367 # which will remain as ForwardRef or plain str under future annotations 

2368 # mode. The full destringify happens later when mapped_column goes 

2369 # to do a full lookup in the registry type_annotations_map. 

2370 annotated = de_stringify_annotation( 

2371 cls, 

2372 raw_annotation, 

2373 originating_module, 

2374 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2375 ) 

2376 except _CleanupError as ce: 

2377 raise orm_exc.MappedAnnotationError( 

2378 f"Could not interpret annotation {raw_annotation}. " 

2379 "Check that it uses names that are correctly imported at the " 

2380 "module level. See chained stack trace for more hints." 

2381 ) from ce 

2382 except NameError as ne: 

2383 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2384 raise orm_exc.MappedAnnotationError( 

2385 f"Could not interpret annotation {raw_annotation}. " 

2386 "Check that it uses names that are correctly imported at the " 

2387 "module level. See chained stack trace for more hints." 

2388 ) from ne 

2389 

2390 annotated = raw_annotation # type: ignore 

2391 

2392 if is_dataclass_field: 

2393 return annotated, None 

2394 else: 

2395 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2396 annotated, _MappedAnnotationBase 

2397 ): 

2398 if expect_mapped: 

2399 if not raiseerr: 

2400 return None 

2401 

2402 origin = getattr(annotated, "__origin__", None) 

2403 if origin is typing.ClassVar: 

2404 return None 

2405 

2406 # check for other kind of ORM descriptor like AssociationProxy, 

2407 # don't raise for that (issue #9957) 

2408 elif isinstance(origin, type) and issubclass( 

2409 origin, ORMDescriptor 

2410 ): 

2411 return None 

2412 

2413 raise orm_exc.MappedAnnotationError( 

2414 f'Type annotation for "{cls.__name__}.{key}" ' 

2415 "can't be correctly interpreted for " 

2416 "Annotated Declarative Table form. ORM annotations " 

2417 "should normally make use of the ``Mapped[]`` generic " 

2418 "type, or other ORM-compatible generic type, as a " 

2419 "container for the actual type, which indicates the " 

2420 "intent that the attribute is mapped. " 

2421 "Class variables that are not intended to be mapped " 

2422 "by the ORM should use ClassVar[]. " 

2423 "To allow Annotated Declarative to disregard legacy " 

2424 "annotations which don't use Mapped[] to pass, set " 

2425 '"__allow_unmapped__ = True" on the class or a ' 

2426 "superclass this class.", 

2427 code="zlpr", 

2428 ) 

2429 

2430 else: 

2431 return annotated, None 

2432 

2433 generic_annotated = cast(GenericProtocol[Any], annotated) 

2434 if len(generic_annotated.__args__) != 1: 

2435 raise orm_exc.MappedAnnotationError( 

2436 "Expected sub-type for Mapped[] annotation" 

2437 ) 

2438 

2439 return ( 

2440 # fix dict/list/set args to be ForwardRef, see #11814 

2441 fixup_container_fwd_refs(generic_annotated.__args__[0]), 

2442 generic_annotated.__origin__, 

2443 ) 

2444 

2445 

2446def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2447 if hasattr(prop, "_mapper_property_name"): 

2448 name = prop._mapper_property_name() 

2449 else: 

2450 name = None 

2451 return util.clsname_as_plain_name(prop, name)