Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

814 statements  

1# orm/util.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Match 

27from typing import Optional 

28from typing import Protocol 

29from typing import Sequence 

30from typing import Tuple 

31from typing import Type 

32from typing import TYPE_CHECKING 

33from typing import TypeVar 

34from typing import Union 

35import weakref 

36 

37from . import attributes # noqa 

38from . import exc 

39from ._typing import _O 

40from ._typing import insp_is_aliased_class 

41from ._typing import insp_is_mapper 

42from ._typing import prop_is_relationship 

43from .base import _class_to_mapper as _class_to_mapper 

44from .base import _MappedAnnotationBase 

45from .base import _never_set as _never_set # noqa: F401 

46from .base import _none_set as _none_set # noqa: F401 

47from .base import attribute_str as attribute_str # noqa: F401 

48from .base import class_mapper as class_mapper 

49from .base import DynamicMapped 

50from .base import InspectionAttr as InspectionAttr 

51from .base import instance_str as instance_str # noqa: F401 

52from .base import Mapped 

53from .base import object_mapper as object_mapper 

54from .base import object_state as object_state # noqa: F401 

55from .base import opt_manager_of_class 

56from .base import ORMDescriptor 

57from .base import state_attribute_str as state_attribute_str # noqa: F401 

58from .base import state_class_str as state_class_str # noqa: F401 

59from .base import state_str as state_str # noqa: F401 

60from .base import WriteOnlyMapped 

61from .interfaces import CriteriaOption 

62from .interfaces import MapperProperty as MapperProperty 

63from .interfaces import ORMColumnsClauseRole 

64from .interfaces import ORMEntityColumnsClauseRole 

65from .interfaces import ORMFromClauseRole 

66from .path_registry import PathRegistry as PathRegistry 

67from .. import event 

68from .. import exc as sa_exc 

69from .. import inspection 

70from .. import sql 

71from .. import util 

72from ..engine.result import result_tuple 

73from ..sql import coercions 

74from ..sql import expression 

75from ..sql import lambdas 

76from ..sql import roles 

77from ..sql import util as sql_util 

78from ..sql import visitors 

79from ..sql._typing import is_selectable 

80from ..sql.annotation import SupportsCloneAnnotations 

81from ..sql.base import ColumnCollection 

82from ..sql.cache_key import HasCacheKey 

83from ..sql.cache_key import MemoizedHasCacheKey 

84from ..sql.elements import ColumnElement 

85from ..sql.elements import KeyedColumnElement 

86from ..sql.selectable import FromClause 

87from ..util.langhelpers import MemoizedSlots 

88from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

89from ..util.typing import ( 

90 de_stringify_union_elements as _de_stringify_union_elements, 

91) 

92from ..util.typing import eval_name_only as _eval_name_only 

93from ..util.typing import fixup_container_fwd_refs 

94from ..util.typing import is_origin_of_cls 

95from ..util.typing import Literal 

96from ..util.typing import TupleAny 

97from ..util.typing import typing_get_origin 

98from ..util.typing import Unpack 

99 

100if typing.TYPE_CHECKING: 

101 from ._typing import _EntityType 

102 from ._typing import _IdentityKeyType 

103 from ._typing import _InternalEntityType 

104 from ._typing import _ORMCOLEXPR 

105 from .context import _MapperEntity 

106 from .context import ORMCompileState 

107 from .mapper import Mapper 

108 from .path_registry import AbstractEntityRegistry 

109 from .query import Query 

110 from .relationships import RelationshipProperty 

111 from ..engine import Row 

112 from ..engine import RowMapping 

113 from ..sql._typing import _CE 

114 from ..sql._typing import _ColumnExpressionArgument 

115 from ..sql._typing import _EquivalentColumnMap 

116 from ..sql._typing import _FromClauseArgument 

117 from ..sql._typing import _OnClauseArgument 

118 from ..sql._typing import _PropagateAttrsType 

119 from ..sql.annotation import _SA 

120 from ..sql.base import ReadOnlyColumnCollection 

121 from ..sql.elements import BindParameter 

122 from ..sql.selectable import _ColumnsClauseElement 

123 from ..sql.selectable import Select 

124 from ..sql.selectable import Selectable 

125 from ..sql.visitors import anon_map 

126 from ..util.typing import _AnnotationScanType 

127 from ..util.typing import ArgsTypeProcotol 

128 

129_T = TypeVar("_T", bound=Any) 

130 

131all_cascades = frozenset( 

132 ( 

133 "delete", 

134 "delete-orphan", 

135 "all", 

136 "merge", 

137 "expunge", 

138 "save-update", 

139 "refresh-expire", 

140 "none", 

141 ) 

142) 

143 

144 

145_de_stringify_partial = functools.partial( 

146 functools.partial, 

147 locals_=util.immutabledict( 

148 { 

149 "Mapped": Mapped, 

150 "WriteOnlyMapped": WriteOnlyMapped, 

151 "DynamicMapped": DynamicMapped, 

152 } 

153 ), 

154) 

155 

156# partial is practically useless as we have to write out the whole 

157# function and maintain the signature anyway 

158 

159 

160class _DeStringifyAnnotation(Protocol): 

161 def __call__( 

162 self, 

163 cls: Type[Any], 

164 annotation: _AnnotationScanType, 

165 originating_module: str, 

166 *, 

167 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

168 include_generic: bool = False, 

169 ) -> Type[Any]: ... 

170 

171 

172de_stringify_annotation = cast( 

173 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

174) 

175 

176 

177class _DeStringifyUnionElements(Protocol): 

178 def __call__( 

179 self, 

180 cls: Type[Any], 

181 annotation: ArgsTypeProcotol, 

182 originating_module: str, 

183 *, 

184 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

185 ) -> Type[Any]: ... 

186 

187 

188de_stringify_union_elements = cast( 

189 _DeStringifyUnionElements, 

190 _de_stringify_partial(_de_stringify_union_elements), 

191) 

192 

193 

194class _EvalNameOnly(Protocol): 

195 def __call__(self, name: str, module_name: str) -> Any: ... 

196 

197 

198eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

199 

200 

201class CascadeOptions(FrozenSet[str]): 

202 """Keeps track of the options sent to 

203 :paramref:`.relationship.cascade`""" 

204 

205 _add_w_all_cascades = all_cascades.difference( 

206 ["all", "none", "delete-orphan"] 

207 ) 

208 _allowed_cascades = all_cascades 

209 

210 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

211 

212 __slots__ = ( 

213 "save_update", 

214 "delete", 

215 "refresh_expire", 

216 "merge", 

217 "expunge", 

218 "delete_orphan", 

219 ) 

220 

221 save_update: bool 

222 delete: bool 

223 refresh_expire: bool 

224 merge: bool 

225 expunge: bool 

226 delete_orphan: bool 

227 

228 def __new__( 

229 cls, value_list: Optional[Union[Iterable[str], str]] 

230 ) -> CascadeOptions: 

231 if isinstance(value_list, str) or value_list is None: 

232 return cls.from_string(value_list) # type: ignore 

233 values = set(value_list) 

234 if values.difference(cls._allowed_cascades): 

235 raise sa_exc.ArgumentError( 

236 "Invalid cascade option(s): %s" 

237 % ", ".join( 

238 [ 

239 repr(x) 

240 for x in sorted( 

241 values.difference(cls._allowed_cascades) 

242 ) 

243 ] 

244 ) 

245 ) 

246 

247 if "all" in values: 

248 values.update(cls._add_w_all_cascades) 

249 if "none" in values: 

250 values.clear() 

251 values.discard("all") 

252 

253 self = super().__new__(cls, values) 

254 self.save_update = "save-update" in values 

255 self.delete = "delete" in values 

256 self.refresh_expire = "refresh-expire" in values 

257 self.merge = "merge" in values 

258 self.expunge = "expunge" in values 

259 self.delete_orphan = "delete-orphan" in values 

260 

261 if self.delete_orphan and not self.delete: 

262 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

263 return self 

264 

265 def __repr__(self): 

266 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

267 

268 @classmethod 

269 def from_string(cls, arg): 

270 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

271 return cls(values) 

272 

273 

274def _validator_events(desc, key, validator, include_removes, include_backrefs): 

275 """Runs a validation method on an attribute value to be set or 

276 appended. 

277 """ 

278 

279 if not include_backrefs: 

280 

281 def detect_is_backref(state, initiator): 

282 impl = state.manager[key].impl 

283 return initiator.impl is not impl 

284 

285 if include_removes: 

286 

287 def append(state, value, initiator): 

288 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

289 include_backrefs or not detect_is_backref(state, initiator) 

290 ): 

291 return validator(state.obj(), key, value, False) 

292 else: 

293 return value 

294 

295 def bulk_set(state, values, initiator): 

296 if include_backrefs or not detect_is_backref(state, initiator): 

297 obj = state.obj() 

298 values[:] = [ 

299 validator(obj, key, value, False) for value in values 

300 ] 

301 

302 def set_(state, value, oldvalue, initiator): 

303 if include_backrefs or not detect_is_backref(state, initiator): 

304 return validator(state.obj(), key, value, False) 

305 else: 

306 return value 

307 

308 def remove(state, value, initiator): 

309 if include_backrefs or not detect_is_backref(state, initiator): 

310 validator(state.obj(), key, value, True) 

311 

312 else: 

313 

314 def append(state, value, initiator): 

315 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

316 include_backrefs or not detect_is_backref(state, initiator) 

317 ): 

318 return validator(state.obj(), key, value) 

319 else: 

320 return value 

321 

322 def bulk_set(state, values, initiator): 

323 if include_backrefs or not detect_is_backref(state, initiator): 

324 obj = state.obj() 

325 values[:] = [validator(obj, key, value) for value in values] 

326 

327 def set_(state, value, oldvalue, initiator): 

328 if include_backrefs or not detect_is_backref(state, initiator): 

329 return validator(state.obj(), key, value) 

330 else: 

331 return value 

332 

333 event.listen(desc, "append", append, raw=True, retval=True) 

334 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

335 event.listen(desc, "set", set_, raw=True, retval=True) 

336 if include_removes: 

337 event.listen(desc, "remove", remove, raw=True, retval=True) 

338 

339 

340def polymorphic_union( 

341 table_map, typecolname, aliasname="p_union", cast_nulls=True 

342): 

343 """Create a ``UNION`` statement used by a polymorphic mapper. 

344 

345 See :ref:`concrete_inheritance` for an example of how 

346 this is used. 

347 

348 :param table_map: mapping of polymorphic identities to 

349 :class:`_schema.Table` objects. 

350 :param typecolname: string name of a "discriminator" column, which will be 

351 derived from the query, producing the polymorphic identity for 

352 each row. If ``None``, no polymorphic discriminator is generated. 

353 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

354 construct generated. 

355 :param cast_nulls: if True, non-existent columns, which are represented 

356 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

357 that is problematic on some backends such as Oracle - in which case it 

358 can be set to False. 

359 

360 """ 

361 

362 colnames: util.OrderedSet[str] = util.OrderedSet() 

363 colnamemaps = {} 

364 types = {} 

365 for key in table_map: 

366 table = table_map[key] 

367 

368 table = coercions.expect( 

369 roles.StrictFromClauseRole, table, allow_select=True 

370 ) 

371 table_map[key] = table 

372 

373 m = {} 

374 for c in table.c: 

375 if c.key == typecolname: 

376 raise sa_exc.InvalidRequestError( 

377 "Polymorphic union can't use '%s' as the discriminator " 

378 "column due to mapped column %r; please apply the " 

379 "'typecolname' " 

380 "argument; this is available on " 

381 "ConcreteBase as '_concrete_discriminator_name'" 

382 % (typecolname, c) 

383 ) 

384 colnames.add(c.key) 

385 m[c.key] = c 

386 types[c.key] = c.type 

387 colnamemaps[table] = m 

388 

389 def col(name, table): 

390 try: 

391 return colnamemaps[table][name] 

392 except KeyError: 

393 if cast_nulls: 

394 return sql.cast(sql.null(), types[name]).label(name) 

395 else: 

396 return sql.type_coerce(sql.null(), types[name]).label(name) 

397 

398 result = [] 

399 for type_, table in table_map.items(): 

400 if typecolname is not None: 

401 result.append( 

402 sql.select( 

403 *( 

404 [col(name, table) for name in colnames] 

405 + [ 

406 sql.literal_column( 

407 sql_util._quote_ddl_expr(type_) 

408 ).label(typecolname) 

409 ] 

410 ) 

411 ).select_from(table) 

412 ) 

413 else: 

414 result.append( 

415 sql.select( 

416 *[col(name, table) for name in colnames] 

417 ).select_from(table) 

418 ) 

419 return sql.union_all(*result).alias(aliasname) 

420 

421 

422def identity_key( 

423 class_: Optional[Type[_T]] = None, 

424 ident: Union[Any, Tuple[Any, ...]] = None, 

425 *, 

426 instance: Optional[_T] = None, 

427 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

428 identity_token: Optional[Any] = None, 

429) -> _IdentityKeyType[_T]: 

430 r"""Generate "identity key" tuples, as are used as keys in the 

431 :attr:`.Session.identity_map` dictionary. 

432 

433 This function has several call styles: 

434 

435 * ``identity_key(class, ident, identity_token=token)`` 

436 

437 This form receives a mapped class and a primary key scalar or 

438 tuple as an argument. 

439 

440 E.g.:: 

441 

442 >>> identity_key(MyClass, (1, 2)) 

443 (<class '__main__.MyClass'>, (1, 2), None) 

444 

445 :param class: mapped class (must be a positional argument) 

446 :param ident: primary key, may be a scalar or tuple argument. 

447 :param identity_token: optional identity token 

448 

449 .. versionadded:: 1.2 added identity_token 

450 

451 

452 * ``identity_key(instance=instance)`` 

453 

454 This form will produce the identity key for a given instance. The 

455 instance need not be persistent, only that its primary key attributes 

456 are populated (else the key will contain ``None`` for those missing 

457 values). 

458 

459 E.g.:: 

460 

461 >>> instance = MyClass(1, 2) 

462 >>> identity_key(instance=instance) 

463 (<class '__main__.MyClass'>, (1, 2), None) 

464 

465 In this form, the given instance is ultimately run though 

466 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

467 effect of performing a database check for the corresponding row 

468 if the object is expired. 

469 

470 :param instance: object instance (must be given as a keyword arg) 

471 

472 * ``identity_key(class, row=row, identity_token=token)`` 

473 

474 This form is similar to the class/tuple form, except is passed a 

475 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

476 

477 E.g.:: 

478 

479 >>> row = engine.execute(\ 

480 text("select * from table where a=1 and b=2")\ 

481 ).first() 

482 >>> identity_key(MyClass, row=row) 

483 (<class '__main__.MyClass'>, (1, 2), None) 

484 

485 :param class: mapped class (must be a positional argument) 

486 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

487 (must be given as a keyword arg) 

488 :param identity_token: optional identity token 

489 

490 .. versionadded:: 1.2 added identity_token 

491 

492 """ 

493 if class_ is not None: 

494 mapper = class_mapper(class_) 

495 if row is None: 

496 if ident is None: 

497 raise sa_exc.ArgumentError("ident or row is required") 

498 return mapper.identity_key_from_primary_key( 

499 tuple(util.to_list(ident)), identity_token=identity_token 

500 ) 

501 else: 

502 return mapper.identity_key_from_row( 

503 row, identity_token=identity_token 

504 ) 

505 elif instance is not None: 

506 mapper = object_mapper(instance) 

507 return mapper.identity_key_from_instance(instance) 

508 else: 

509 raise sa_exc.ArgumentError("class or instance is required") 

510 

511 

512class _TraceAdaptRole(enum.Enum): 

513 """Enumeration of all the use cases for ORMAdapter. 

514 

515 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

516 used for in-place adaption of column expressions to be applied to a SELECT, 

517 replacing :class:`.Table` and other objects that are mapped to classes with 

518 aliases of those tables in the case of joined eager loading, or in the case 

519 of polymorphic loading as used with concrete mappings or other custom "with 

520 polymorphic" parameters, with whole user-defined subqueries. The 

521 enumerations provide an overview of all the use cases used by ORMAdapter, a 

522 layer of formality as to the introduction of new ORMAdapter use cases (of 

523 which none are anticipated), as well as a means to trace the origins of a 

524 particular ORMAdapter within runtime debugging. 

525 

526 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

527 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

528 method and the ``Query.select_from_entity()`` methods, favoring 

529 user-explicit aliasing schemes using the ``aliased()`` and 

530 ``with_polymorphic()`` standalone constructs; these still use adaption, 

531 however the adaption is applied in a narrower scope. 

532 

533 """ 

534 

535 # aliased() use that is used to adapt individual attributes at query 

536 # construction time 

537 ALIASED_INSP = enum.auto() 

538 

539 # joinedload cases; typically adapt an ON clause of a relationship 

540 # join 

541 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

542 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

543 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

544 

545 # polymorphic cases - these are complex ones that replace FROM 

546 # clauses, replacing tables with subqueries 

547 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

548 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

549 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

550 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

551 

552 # the from_statement() case, used only to adapt individual attributes 

553 # from a given statement to local ORM attributes at result fetching 

554 # time. assigned to ORMCompileState._from_obj_alias 

555 ADAPT_FROM_STATEMENT = enum.auto() 

556 

557 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

558 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

559 # joinedloads are then placed on the outside. 

560 # assigned to ORMCompileState.compound_eager_adapter 

561 COMPOUND_EAGER_STATEMENT = enum.auto() 

562 

563 # the legacy Query._set_select_from() case. 

564 # this is needed for Query's set operations (i.e. UNION, etc. ) 

565 # as well as "legacy from_self()", which while removed from 2.0 as 

566 # public API, is used for the Query.count() method. this one 

567 # still does full statement traversal 

568 # assigned to ORMCompileState._from_obj_alias 

569 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

570 

571 

572class ORMStatementAdapter(sql_util.ColumnAdapter): 

573 """ColumnAdapter which includes a role attribute.""" 

574 

575 __slots__ = ("role",) 

576 

577 def __init__( 

578 self, 

579 role: _TraceAdaptRole, 

580 selectable: Selectable, 

581 *, 

582 equivalents: Optional[_EquivalentColumnMap] = None, 

583 adapt_required: bool = False, 

584 allow_label_resolve: bool = True, 

585 anonymize_labels: bool = False, 

586 adapt_on_names: bool = False, 

587 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

588 ): 

589 self.role = role 

590 super().__init__( 

591 selectable, 

592 equivalents=equivalents, 

593 adapt_required=adapt_required, 

594 allow_label_resolve=allow_label_resolve, 

595 anonymize_labels=anonymize_labels, 

596 adapt_on_names=adapt_on_names, 

597 adapt_from_selectables=adapt_from_selectables, 

598 ) 

599 

600 

601class ORMAdapter(sql_util.ColumnAdapter): 

602 """ColumnAdapter subclass which excludes adaptation of entities from 

603 non-matching mappers. 

604 

605 """ 

606 

607 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

608 

609 is_aliased_class: bool 

610 aliased_insp: Optional[AliasedInsp[Any]] 

611 

612 def __init__( 

613 self, 

614 role: _TraceAdaptRole, 

615 entity: _InternalEntityType[Any], 

616 *, 

617 equivalents: Optional[_EquivalentColumnMap] = None, 

618 adapt_required: bool = False, 

619 allow_label_resolve: bool = True, 

620 anonymize_labels: bool = False, 

621 selectable: Optional[Selectable] = None, 

622 limit_on_entity: bool = True, 

623 adapt_on_names: bool = False, 

624 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

625 ): 

626 self.role = role 

627 self.mapper = entity.mapper 

628 if selectable is None: 

629 selectable = entity.selectable 

630 if insp_is_aliased_class(entity): 

631 self.is_aliased_class = True 

632 self.aliased_insp = entity 

633 else: 

634 self.is_aliased_class = False 

635 self.aliased_insp = None 

636 

637 super().__init__( 

638 selectable, 

639 equivalents, 

640 adapt_required=adapt_required, 

641 allow_label_resolve=allow_label_resolve, 

642 anonymize_labels=anonymize_labels, 

643 include_fn=self._include_fn if limit_on_entity else None, 

644 adapt_on_names=adapt_on_names, 

645 adapt_from_selectables=adapt_from_selectables, 

646 ) 

647 

648 def _include_fn(self, elem): 

649 entity = elem._annotations.get("parentmapper", None) 

650 

651 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

652 

653 

654class AliasedClass( 

655 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

656): 

657 r"""Represents an "aliased" form of a mapped class for usage with Query. 

658 

659 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

660 construct, this object mimics the mapped class using a 

661 ``__getattr__`` scheme and maintains a reference to a 

662 real :class:`~sqlalchemy.sql.expression.Alias` object. 

663 

664 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

665 within a SQL statement generated by the ORM, such that an existing 

666 mapped entity can be used in multiple contexts. A simple example:: 

667 

668 # find all pairs of users with the same name 

669 user_alias = aliased(User) 

670 session.query(User, user_alias).\ 

671 join((user_alias, User.id > user_alias.id)).\ 

672 filter(User.name == user_alias.name) 

673 

674 :class:`.AliasedClass` is also capable of mapping an existing mapped 

675 class to an entirely new selectable, provided this selectable is column- 

676 compatible with the existing mapped selectable, and it can also be 

677 configured in a mapping as the target of a :func:`_orm.relationship`. 

678 See the links below for examples. 

679 

680 The :class:`.AliasedClass` object is constructed typically using the 

681 :func:`_orm.aliased` function. It also is produced with additional 

682 configuration when using the :func:`_orm.with_polymorphic` function. 

683 

684 The resulting object is an instance of :class:`.AliasedClass`. 

685 This object implements an attribute scheme which produces the 

686 same attribute and method interface as the original mapped 

687 class, allowing :class:`.AliasedClass` to be compatible 

688 with any attribute technique which works on the original class, 

689 including hybrid attributes (see :ref:`hybrids_toplevel`). 

690 

691 The :class:`.AliasedClass` can be inspected for its underlying 

692 :class:`_orm.Mapper`, aliased selectable, and other information 

693 using :func:`_sa.inspect`:: 

694 

695 from sqlalchemy import inspect 

696 my_alias = aliased(MyClass) 

697 insp = inspect(my_alias) 

698 

699 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

700 

701 

702 .. seealso:: 

703 

704 :func:`.aliased` 

705 

706 :func:`.with_polymorphic` 

707 

708 :ref:`relationship_aliased_class` 

709 

710 :ref:`relationship_to_window_function` 

711 

712 

713 """ 

714 

715 __name__: str 

716 

717 def __init__( 

718 self, 

719 mapped_class_or_ac: _EntityType[_O], 

720 alias: Optional[FromClause] = None, 

721 name: Optional[str] = None, 

722 flat: bool = False, 

723 adapt_on_names: bool = False, 

724 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

725 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

726 base_alias: Optional[AliasedInsp[Any]] = None, 

727 use_mapper_path: bool = False, 

728 represents_outer_join: bool = False, 

729 ): 

730 insp = cast( 

731 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

732 ) 

733 mapper = insp.mapper 

734 

735 nest_adapters = False 

736 

737 if alias is None: 

738 if insp.is_aliased_class and insp.selectable._is_subquery: 

739 alias = insp.selectable.alias() 

740 else: 

741 alias = ( 

742 mapper._with_polymorphic_selectable._anonymous_fromclause( 

743 name=name, 

744 flat=flat, 

745 ) 

746 ) 

747 elif insp.is_aliased_class: 

748 nest_adapters = True 

749 

750 assert alias is not None 

751 self._aliased_insp = AliasedInsp( 

752 self, 

753 insp, 

754 alias, 

755 name, 

756 ( 

757 with_polymorphic_mappers 

758 if with_polymorphic_mappers 

759 else mapper.with_polymorphic_mappers 

760 ), 

761 ( 

762 with_polymorphic_discriminator 

763 if with_polymorphic_discriminator is not None 

764 else mapper.polymorphic_on 

765 ), 

766 base_alias, 

767 use_mapper_path, 

768 adapt_on_names, 

769 represents_outer_join, 

770 nest_adapters, 

771 ) 

772 

773 self.__name__ = f"aliased({mapper.class_.__name__})" 

774 

775 @classmethod 

776 def _reconstitute_from_aliased_insp( 

777 cls, aliased_insp: AliasedInsp[_O] 

778 ) -> AliasedClass[_O]: 

779 obj = cls.__new__(cls) 

780 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

781 obj._aliased_insp = aliased_insp 

782 

783 if aliased_insp._is_with_polymorphic: 

784 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

785 if sub_aliased_insp is not aliased_insp: 

786 ent = AliasedClass._reconstitute_from_aliased_insp( 

787 sub_aliased_insp 

788 ) 

789 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

790 

791 return obj 

792 

793 def __getattr__(self, key: str) -> Any: 

794 try: 

795 _aliased_insp = self.__dict__["_aliased_insp"] 

796 except KeyError: 

797 raise AttributeError() 

798 else: 

799 target = _aliased_insp._target 

800 # maintain all getattr mechanics 

801 attr = getattr(target, key) 

802 

803 # attribute is a method, that will be invoked against a 

804 # "self"; so just return a new method with the same function and 

805 # new self 

806 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

807 return types.MethodType(attr.__func__, self) 

808 

809 # attribute is a descriptor, that will be invoked against a 

810 # "self"; so invoke the descriptor against this self 

811 if hasattr(attr, "__get__"): 

812 attr = attr.__get__(None, self) 

813 

814 # attributes within the QueryableAttribute system will want this 

815 # to be invoked so the object can be adapted 

816 if hasattr(attr, "adapt_to_entity"): 

817 attr = attr.adapt_to_entity(_aliased_insp) 

818 setattr(self, key, attr) 

819 

820 return attr 

821 

822 def _get_from_serialized( 

823 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

824 ) -> Any: 

825 # this method is only used in terms of the 

826 # sqlalchemy.ext.serializer extension 

827 attr = getattr(mapped_class, key) 

828 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

829 return types.MethodType(attr.__func__, self) 

830 

831 # attribute is a descriptor, that will be invoked against a 

832 # "self"; so invoke the descriptor against this self 

833 if hasattr(attr, "__get__"): 

834 attr = attr.__get__(None, self) 

835 

836 # attributes within the QueryableAttribute system will want this 

837 # to be invoked so the object can be adapted 

838 if hasattr(attr, "adapt_to_entity"): 

839 aliased_insp._weak_entity = weakref.ref(self) 

840 attr = attr.adapt_to_entity(aliased_insp) 

841 setattr(self, key, attr) 

842 

843 return attr 

844 

845 def __repr__(self) -> str: 

846 return "<AliasedClass at 0x%x; %s>" % ( 

847 id(self), 

848 self._aliased_insp._target.__name__, 

849 ) 

850 

851 def __str__(self) -> str: 

852 return str(self._aliased_insp) 

853 

854 

855@inspection._self_inspects 

856class AliasedInsp( 

857 ORMEntityColumnsClauseRole[_O], 

858 ORMFromClauseRole, 

859 HasCacheKey, 

860 InspectionAttr, 

861 MemoizedSlots, 

862 inspection.Inspectable["AliasedInsp[_O]"], 

863 Generic[_O], 

864): 

865 """Provide an inspection interface for an 

866 :class:`.AliasedClass` object. 

867 

868 The :class:`.AliasedInsp` object is returned 

869 given an :class:`.AliasedClass` using the 

870 :func:`_sa.inspect` function:: 

871 

872 from sqlalchemy import inspect 

873 from sqlalchemy.orm import aliased 

874 

875 my_alias = aliased(MyMappedClass) 

876 insp = inspect(my_alias) 

877 

878 Attributes on :class:`.AliasedInsp` 

879 include: 

880 

881 * ``entity`` - the :class:`.AliasedClass` represented. 

882 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

883 * ``selectable`` - the :class:`_expression.Alias` 

884 construct which ultimately 

885 represents an aliased :class:`_schema.Table` or 

886 :class:`_expression.Select` 

887 construct. 

888 * ``name`` - the name of the alias. Also is used as the attribute 

889 name when returned in a result tuple from :class:`_query.Query`. 

890 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

891 objects 

892 indicating all those mappers expressed in the select construct 

893 for the :class:`.AliasedClass`. 

894 * ``polymorphic_on`` - an alternate column or SQL expression which 

895 will be used as the "discriminator" for a polymorphic load. 

896 

897 .. seealso:: 

898 

899 :ref:`inspection_toplevel` 

900 

901 """ 

902 

903 __slots__ = ( 

904 "__weakref__", 

905 "_weak_entity", 

906 "mapper", 

907 "selectable", 

908 "name", 

909 "_adapt_on_names", 

910 "with_polymorphic_mappers", 

911 "polymorphic_on", 

912 "_use_mapper_path", 

913 "_base_alias", 

914 "represents_outer_join", 

915 "persist_selectable", 

916 "local_table", 

917 "_is_with_polymorphic", 

918 "_with_polymorphic_entities", 

919 "_adapter", 

920 "_target", 

921 "__clause_element__", 

922 "_memoized_values", 

923 "_all_column_expressions", 

924 "_nest_adapters", 

925 ) 

926 

927 _cache_key_traversal = [ 

928 ("name", visitors.ExtendedInternalTraversal.dp_string), 

929 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

930 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

931 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

932 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

933 ( 

934 "with_polymorphic_mappers", 

935 visitors.InternalTraversal.dp_has_cache_key_list, 

936 ), 

937 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

938 ] 

939 

940 mapper: Mapper[_O] 

941 selectable: FromClause 

942 _adapter: ORMAdapter 

943 with_polymorphic_mappers: Sequence[Mapper[Any]] 

944 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

945 

946 _weak_entity: weakref.ref[AliasedClass[_O]] 

947 """the AliasedClass that refers to this AliasedInsp""" 

948 

949 _target: Union[Type[_O], AliasedClass[_O]] 

950 """the thing referenced by the AliasedClass/AliasedInsp. 

951 

952 In the vast majority of cases, this is the mapped class. However 

953 it may also be another AliasedClass (alias of alias). 

954 

955 """ 

956 

957 def __init__( 

958 self, 

959 entity: AliasedClass[_O], 

960 inspected: _InternalEntityType[_O], 

961 selectable: FromClause, 

962 name: Optional[str], 

963 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

964 polymorphic_on: Optional[ColumnElement[Any]], 

965 _base_alias: Optional[AliasedInsp[Any]], 

966 _use_mapper_path: bool, 

967 adapt_on_names: bool, 

968 represents_outer_join: bool, 

969 nest_adapters: bool, 

970 ): 

971 mapped_class_or_ac = inspected.entity 

972 mapper = inspected.mapper 

973 

974 self._weak_entity = weakref.ref(entity) 

975 self.mapper = mapper 

976 self.selectable = self.persist_selectable = self.local_table = ( 

977 selectable 

978 ) 

979 self.name = name 

980 self.polymorphic_on = polymorphic_on 

981 self._base_alias = weakref.ref(_base_alias or self) 

982 self._use_mapper_path = _use_mapper_path 

983 self.represents_outer_join = represents_outer_join 

984 self._nest_adapters = nest_adapters 

985 

986 if with_polymorphic_mappers: 

987 self._is_with_polymorphic = True 

988 self.with_polymorphic_mappers = with_polymorphic_mappers 

989 self._with_polymorphic_entities = [] 

990 for poly in self.with_polymorphic_mappers: 

991 if poly is not mapper: 

992 ent = AliasedClass( 

993 poly.class_, 

994 selectable, 

995 base_alias=self, 

996 adapt_on_names=adapt_on_names, 

997 use_mapper_path=_use_mapper_path, 

998 ) 

999 

1000 setattr(self.entity, poly.class_.__name__, ent) 

1001 self._with_polymorphic_entities.append(ent._aliased_insp) 

1002 

1003 else: 

1004 self._is_with_polymorphic = False 

1005 self.with_polymorphic_mappers = [mapper] 

1006 

1007 self._adapter = ORMAdapter( 

1008 _TraceAdaptRole.ALIASED_INSP, 

1009 mapper, 

1010 selectable=selectable, 

1011 equivalents=mapper._equivalent_columns, 

1012 adapt_on_names=adapt_on_names, 

1013 anonymize_labels=True, 

1014 # make sure the adapter doesn't try to grab other tables that 

1015 # are not even the thing we are mapping, such as embedded 

1016 # selectables in subqueries or CTEs. See issue #6060 

1017 adapt_from_selectables={ 

1018 m.selectable 

1019 for m in self.with_polymorphic_mappers 

1020 if not adapt_on_names 

1021 }, 

1022 limit_on_entity=False, 

1023 ) 

1024 

1025 if nest_adapters: 

1026 # supports "aliased class of aliased class" use case 

1027 assert isinstance(inspected, AliasedInsp) 

1028 self._adapter = inspected._adapter.wrap(self._adapter) 

1029 

1030 self._adapt_on_names = adapt_on_names 

1031 self._target = mapped_class_or_ac 

1032 

1033 @classmethod 

1034 def _alias_factory( 

1035 cls, 

1036 element: Union[_EntityType[_O], FromClause], 

1037 alias: Optional[FromClause] = None, 

1038 name: Optional[str] = None, 

1039 flat: bool = False, 

1040 adapt_on_names: bool = False, 

1041 ) -> Union[AliasedClass[_O], FromClause]: 

1042 if isinstance(element, FromClause): 

1043 if adapt_on_names: 

1044 raise sa_exc.ArgumentError( 

1045 "adapt_on_names only applies to ORM elements" 

1046 ) 

1047 if name: 

1048 return element.alias(name=name, flat=flat) 

1049 else: 

1050 return coercions.expect( 

1051 roles.AnonymizedFromClauseRole, element, flat=flat 

1052 ) 

1053 else: 

1054 return AliasedClass( 

1055 element, 

1056 alias=alias, 

1057 flat=flat, 

1058 name=name, 

1059 adapt_on_names=adapt_on_names, 

1060 ) 

1061 

1062 @classmethod 

1063 def _with_polymorphic_factory( 

1064 cls, 

1065 base: Union[Type[_O], Mapper[_O]], 

1066 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1067 selectable: Union[Literal[False, None], FromClause] = False, 

1068 flat: bool = False, 

1069 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1070 aliased: bool = False, 

1071 innerjoin: bool = False, 

1072 adapt_on_names: bool = False, 

1073 name: Optional[str] = None, 

1074 _use_mapper_path: bool = False, 

1075 ) -> AliasedClass[_O]: 

1076 primary_mapper = _class_to_mapper(base) 

1077 

1078 if selectable not in (None, False) and flat: 

1079 raise sa_exc.ArgumentError( 

1080 "the 'flat' and 'selectable' arguments cannot be passed " 

1081 "simultaneously to with_polymorphic()" 

1082 ) 

1083 

1084 mappers, selectable = primary_mapper._with_polymorphic_args( 

1085 classes, selectable, innerjoin=innerjoin 

1086 ) 

1087 if aliased or flat: 

1088 assert selectable is not None 

1089 selectable = selectable._anonymous_fromclause(flat=flat) 

1090 

1091 return AliasedClass( 

1092 base, 

1093 selectable, 

1094 name=name, 

1095 with_polymorphic_mappers=mappers, 

1096 adapt_on_names=adapt_on_names, 

1097 with_polymorphic_discriminator=polymorphic_on, 

1098 use_mapper_path=_use_mapper_path, 

1099 represents_outer_join=not innerjoin, 

1100 ) 

1101 

1102 @property 

1103 def entity(self) -> AliasedClass[_O]: 

1104 # to eliminate reference cycles, the AliasedClass is held weakly. 

1105 # this produces some situations where the AliasedClass gets lost, 

1106 # particularly when one is created internally and only the AliasedInsp 

1107 # is passed around. 

1108 # to work around this case, we just generate a new one when we need 

1109 # it, as it is a simple class with very little initial state on it. 

1110 ent = self._weak_entity() 

1111 if ent is None: 

1112 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1113 self._weak_entity = weakref.ref(ent) 

1114 return ent 

1115 

1116 is_aliased_class = True 

1117 "always returns True" 

1118 

1119 def _memoized_method___clause_element__(self) -> FromClause: 

1120 return self.selectable._annotate( 

1121 { 

1122 "parentmapper": self.mapper, 

1123 "parententity": self, 

1124 "entity_namespace": self, 

1125 } 

1126 )._set_propagate_attrs( 

1127 {"compile_state_plugin": "orm", "plugin_subject": self} 

1128 ) 

1129 

1130 @property 

1131 def entity_namespace(self) -> AliasedClass[_O]: 

1132 return self.entity 

1133 

1134 @property 

1135 def class_(self) -> Type[_O]: 

1136 """Return the mapped class ultimately represented by this 

1137 :class:`.AliasedInsp`.""" 

1138 return self.mapper.class_ 

1139 

1140 @property 

1141 def _path_registry(self) -> AbstractEntityRegistry: 

1142 if self._use_mapper_path: 

1143 return self.mapper._path_registry 

1144 else: 

1145 return PathRegistry.per_mapper(self) 

1146 

1147 def __getstate__(self) -> Dict[str, Any]: 

1148 return { 

1149 "entity": self.entity, 

1150 "mapper": self.mapper, 

1151 "alias": self.selectable, 

1152 "name": self.name, 

1153 "adapt_on_names": self._adapt_on_names, 

1154 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1155 "with_polymorphic_discriminator": self.polymorphic_on, 

1156 "base_alias": self._base_alias(), 

1157 "use_mapper_path": self._use_mapper_path, 

1158 "represents_outer_join": self.represents_outer_join, 

1159 "nest_adapters": self._nest_adapters, 

1160 } 

1161 

1162 def __setstate__(self, state: Dict[str, Any]) -> None: 

1163 self.__init__( # type: ignore 

1164 state["entity"], 

1165 state["mapper"], 

1166 state["alias"], 

1167 state["name"], 

1168 state["with_polymorphic_mappers"], 

1169 state["with_polymorphic_discriminator"], 

1170 state["base_alias"], 

1171 state["use_mapper_path"], 

1172 state["adapt_on_names"], 

1173 state["represents_outer_join"], 

1174 state["nest_adapters"], 

1175 ) 

1176 

1177 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1178 # assert self._is_with_polymorphic 

1179 # assert other._is_with_polymorphic 

1180 

1181 primary_mapper = other.mapper 

1182 

1183 assert self.mapper is primary_mapper 

1184 

1185 our_classes = util.to_set( 

1186 mp.class_ for mp in self.with_polymorphic_mappers 

1187 ) 

1188 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1189 if our_classes == new_classes: 

1190 return other 

1191 else: 

1192 classes = our_classes.union(new_classes) 

1193 

1194 mappers, selectable = primary_mapper._with_polymorphic_args( 

1195 classes, None, innerjoin=not other.represents_outer_join 

1196 ) 

1197 selectable = selectable._anonymous_fromclause(flat=True) 

1198 return AliasedClass( 

1199 primary_mapper, 

1200 selectable, 

1201 with_polymorphic_mappers=mappers, 

1202 with_polymorphic_discriminator=other.polymorphic_on, 

1203 use_mapper_path=other._use_mapper_path, 

1204 represents_outer_join=other.represents_outer_join, 

1205 )._aliased_insp 

1206 

1207 def _adapt_element( 

1208 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1209 ) -> _ORMCOLEXPR: 

1210 assert isinstance(expr, ColumnElement) 

1211 d: Dict[str, Any] = { 

1212 "parententity": self, 

1213 "parentmapper": self.mapper, 

1214 } 

1215 if key: 

1216 d["proxy_key"] = key 

1217 

1218 # IMO mypy should see this one also as returning the same type 

1219 # we put into it, but it's not 

1220 return ( 

1221 self._adapter.traverse(expr) 

1222 ._annotate(d) 

1223 ._set_propagate_attrs( 

1224 {"compile_state_plugin": "orm", "plugin_subject": self} 

1225 ) 

1226 ) 

1227 

1228 if TYPE_CHECKING: 

1229 # establish compatibility with the _ORMAdapterProto protocol, 

1230 # which in turn is compatible with _CoreAdapterProto. 

1231 

1232 def _orm_adapt_element( 

1233 self, 

1234 obj: _CE, 

1235 key: Optional[str] = None, 

1236 ) -> _CE: ... 

1237 

1238 else: 

1239 _orm_adapt_element = _adapt_element 

1240 

1241 def _entity_for_mapper(self, mapper): 

1242 self_poly = self.with_polymorphic_mappers 

1243 if mapper in self_poly: 

1244 if mapper is self.mapper: 

1245 return self 

1246 else: 

1247 return getattr( 

1248 self.entity, mapper.class_.__name__ 

1249 )._aliased_insp 

1250 elif mapper.isa(self.mapper): 

1251 return self 

1252 else: 

1253 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1254 

1255 def _memoized_attr__get_clause(self): 

1256 onclause, replacemap = self.mapper._get_clause 

1257 return ( 

1258 self._adapter.traverse(onclause), 

1259 { 

1260 self._adapter.traverse(col): param 

1261 for col, param in replacemap.items() 

1262 }, 

1263 ) 

1264 

1265 def _memoized_attr__memoized_values(self): 

1266 return {} 

1267 

1268 def _memoized_attr__all_column_expressions(self): 

1269 if self._is_with_polymorphic: 

1270 cols_plus_keys = self.mapper._columns_plus_keys( 

1271 [ent.mapper for ent in self._with_polymorphic_entities] 

1272 ) 

1273 else: 

1274 cols_plus_keys = self.mapper._columns_plus_keys() 

1275 

1276 cols_plus_keys = [ 

1277 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1278 ] 

1279 

1280 return ColumnCollection(cols_plus_keys) 

1281 

1282 def _memo(self, key, callable_, *args, **kw): 

1283 if key in self._memoized_values: 

1284 return self._memoized_values[key] 

1285 else: 

1286 self._memoized_values[key] = value = callable_(*args, **kw) 

1287 return value 

1288 

1289 def __repr__(self): 

1290 if self.with_polymorphic_mappers: 

1291 with_poly = "(%s)" % ", ".join( 

1292 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1293 ) 

1294 else: 

1295 with_poly = "" 

1296 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1297 id(self), 

1298 self.class_.__name__, 

1299 with_poly, 

1300 ) 

1301 

1302 def __str__(self): 

1303 if self._is_with_polymorphic: 

1304 return "with_polymorphic(%s, [%s])" % ( 

1305 self._target.__name__, 

1306 ", ".join( 

1307 mp.class_.__name__ 

1308 for mp in self.with_polymorphic_mappers 

1309 if mp is not self.mapper 

1310 ), 

1311 ) 

1312 else: 

1313 return "aliased(%s)" % (self._target.__name__,) 

1314 

1315 

1316class _WrapUserEntity: 

1317 """A wrapper used within the loader_criteria lambda caller so that 

1318 we can bypass declared_attr descriptors on unmapped mixins, which 

1319 normally emit a warning for such use. 

1320 

1321 might also be useful for other per-lambda instrumentations should 

1322 the need arise. 

1323 

1324 """ 

1325 

1326 __slots__ = ("subject",) 

1327 

1328 def __init__(self, subject): 

1329 self.subject = subject 

1330 

1331 @util.preload_module("sqlalchemy.orm.decl_api") 

1332 def __getattribute__(self, name): 

1333 decl_api = util.preloaded.orm.decl_api 

1334 

1335 subject = object.__getattribute__(self, "subject") 

1336 if name in subject.__dict__ and isinstance( 

1337 subject.__dict__[name], decl_api.declared_attr 

1338 ): 

1339 return subject.__dict__[name].fget(subject) 

1340 else: 

1341 return getattr(subject, name) 

1342 

1343 

1344class LoaderCriteriaOption(CriteriaOption): 

1345 """Add additional WHERE criteria to the load for all occurrences of 

1346 a particular entity. 

1347 

1348 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1349 :func:`_orm.with_loader_criteria` function; see that function for 

1350 details. 

1351 

1352 .. versionadded:: 1.4 

1353 

1354 """ 

1355 

1356 __slots__ = ( 

1357 "root_entity", 

1358 "entity", 

1359 "deferred_where_criteria", 

1360 "where_criteria", 

1361 "_where_crit_orig", 

1362 "include_aliases", 

1363 "propagate_to_loaders", 

1364 ) 

1365 

1366 _traverse_internals = [ 

1367 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1368 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1369 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1370 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1371 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1372 ] 

1373 

1374 root_entity: Optional[Type[Any]] 

1375 entity: Optional[_InternalEntityType[Any]] 

1376 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1377 deferred_where_criteria: bool 

1378 include_aliases: bool 

1379 propagate_to_loaders: bool 

1380 

1381 _where_crit_orig: Any 

1382 

1383 def __init__( 

1384 self, 

1385 entity_or_base: _EntityType[Any], 

1386 where_criteria: Union[ 

1387 _ColumnExpressionArgument[bool], 

1388 Callable[[Any], _ColumnExpressionArgument[bool]], 

1389 ], 

1390 loader_only: bool = False, 

1391 include_aliases: bool = False, 

1392 propagate_to_loaders: bool = True, 

1393 track_closure_variables: bool = True, 

1394 ): 

1395 entity = cast( 

1396 "_InternalEntityType[Any]", 

1397 inspection.inspect(entity_or_base, False), 

1398 ) 

1399 if entity is None: 

1400 self.root_entity = cast("Type[Any]", entity_or_base) 

1401 self.entity = None 

1402 else: 

1403 self.root_entity = None 

1404 self.entity = entity 

1405 

1406 self._where_crit_orig = where_criteria 

1407 if callable(where_criteria): 

1408 if self.root_entity is not None: 

1409 wrap_entity = self.root_entity 

1410 else: 

1411 assert entity is not None 

1412 wrap_entity = entity.entity 

1413 

1414 self.deferred_where_criteria = True 

1415 self.where_criteria = lambdas.DeferredLambdaElement( 

1416 where_criteria, 

1417 roles.WhereHavingRole, 

1418 lambda_args=(_WrapUserEntity(wrap_entity),), 

1419 opts=lambdas.LambdaOptions( 

1420 track_closure_variables=track_closure_variables 

1421 ), 

1422 ) 

1423 else: 

1424 self.deferred_where_criteria = False 

1425 self.where_criteria = coercions.expect( 

1426 roles.WhereHavingRole, where_criteria 

1427 ) 

1428 

1429 self.include_aliases = include_aliases 

1430 self.propagate_to_loaders = propagate_to_loaders 

1431 

1432 @classmethod 

1433 def _unreduce( 

1434 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1435 ): 

1436 return LoaderCriteriaOption( 

1437 entity, 

1438 where_criteria, 

1439 include_aliases=include_aliases, 

1440 propagate_to_loaders=propagate_to_loaders, 

1441 ) 

1442 

1443 def __reduce__(self): 

1444 return ( 

1445 LoaderCriteriaOption._unreduce, 

1446 ( 

1447 self.entity.class_ if self.entity else self.root_entity, 

1448 self._where_crit_orig, 

1449 self.include_aliases, 

1450 self.propagate_to_loaders, 

1451 ), 

1452 ) 

1453 

1454 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1455 if self.entity: 

1456 yield from self.entity.mapper.self_and_descendants 

1457 else: 

1458 assert self.root_entity 

1459 stack = list(self.root_entity.__subclasses__()) 

1460 while stack: 

1461 subclass = stack.pop(0) 

1462 ent = cast( 

1463 "_InternalEntityType[Any]", 

1464 inspection.inspect(subclass, raiseerr=False), 

1465 ) 

1466 if ent: 

1467 yield from ent.mapper.self_and_descendants 

1468 else: 

1469 stack.extend(subclass.__subclasses__()) 

1470 

1471 def _should_include(self, compile_state: ORMCompileState) -> bool: 

1472 if ( 

1473 compile_state.select_statement._annotations.get( 

1474 "for_loader_criteria", None 

1475 ) 

1476 is self 

1477 ): 

1478 return False 

1479 return True 

1480 

1481 def _resolve_where_criteria( 

1482 self, ext_info: _InternalEntityType[Any] 

1483 ) -> ColumnElement[bool]: 

1484 if self.deferred_where_criteria: 

1485 crit = cast( 

1486 "ColumnElement[bool]", 

1487 self.where_criteria._resolve_with_args(ext_info.entity), 

1488 ) 

1489 else: 

1490 crit = self.where_criteria # type: ignore 

1491 assert isinstance(crit, ColumnElement) 

1492 return sql_util._deep_annotate( 

1493 crit, 

1494 {"for_loader_criteria": self}, 

1495 detect_subquery_cols=True, 

1496 ind_cols_on_fromclause=True, 

1497 ) 

1498 

1499 def process_compile_state_replaced_entities( 

1500 self, 

1501 compile_state: ORMCompileState, 

1502 mapper_entities: Iterable[_MapperEntity], 

1503 ) -> None: 

1504 self.process_compile_state(compile_state) 

1505 

1506 def process_compile_state(self, compile_state: ORMCompileState) -> None: 

1507 """Apply a modification to a given :class:`.CompileState`.""" 

1508 

1509 # if options to limit the criteria to immediate query only, 

1510 # use compile_state.attributes instead 

1511 

1512 self.get_global_criteria(compile_state.global_attributes) 

1513 

1514 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1515 for mp in self._all_mappers(): 

1516 load_criteria = attributes.setdefault( 

1517 ("additional_entity_criteria", mp), [] 

1518 ) 

1519 

1520 load_criteria.append(self) 

1521 

1522 

1523inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1524 

1525 

1526@inspection._inspects(type) 

1527def _inspect_mc( 

1528 class_: Type[_O], 

1529) -> Optional[Mapper[_O]]: 

1530 try: 

1531 class_manager = opt_manager_of_class(class_) 

1532 if class_manager is None or not class_manager.is_mapped: 

1533 return None 

1534 mapper = class_manager.mapper 

1535 except exc.NO_STATE: 

1536 return None 

1537 else: 

1538 return mapper 

1539 

1540 

1541GenericAlias = type(List[Any]) 

1542 

1543 

1544@inspection._inspects(GenericAlias) 

1545def _inspect_generic_alias( 

1546 class_: Type[_O], 

1547) -> Optional[Mapper[_O]]: 

1548 origin = cast("Type[_O]", typing_get_origin(class_)) 

1549 return _inspect_mc(origin) 

1550 

1551 

1552@inspection._self_inspects 

1553class Bundle( 

1554 ORMColumnsClauseRole[_T], 

1555 SupportsCloneAnnotations, 

1556 MemoizedHasCacheKey, 

1557 inspection.Inspectable["Bundle[_T]"], 

1558 InspectionAttr, 

1559): 

1560 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1561 under one namespace. 

1562 

1563 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1564 results returned by a column-oriented :class:`_query.Query` object. 

1565 It also 

1566 is extensible via simple subclassing, where the primary capability 

1567 to override is that of how the set of expressions should be returned, 

1568 allowing post-processing as well as custom return types, without 

1569 involving ORM identity-mapped classes. 

1570 

1571 .. seealso:: 

1572 

1573 :ref:`bundles` 

1574 

1575 

1576 """ 

1577 

1578 single_entity = False 

1579 """If True, queries for a single Bundle will be returned as a single 

1580 entity, rather than an element within a keyed tuple.""" 

1581 

1582 is_clause_element = False 

1583 

1584 is_mapper = False 

1585 

1586 is_aliased_class = False 

1587 

1588 is_bundle = True 

1589 

1590 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1591 

1592 proxy_set = util.EMPTY_SET # type: ignore 

1593 

1594 exprs: List[_ColumnsClauseElement] 

1595 

1596 def __init__( 

1597 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1598 ): 

1599 r"""Construct a new :class:`.Bundle`. 

1600 

1601 e.g.:: 

1602 

1603 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1604 

1605 for row in session.query(bn).filter( 

1606 bn.c.x == 5).filter(bn.c.y == 4): 

1607 print(row.mybundle.x, row.mybundle.y) 

1608 

1609 :param name: name of the bundle. 

1610 :param \*exprs: columns or SQL expressions comprising the bundle. 

1611 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1612 can be returned as a "single entity" outside of any enclosing tuple 

1613 in the same manner as a mapped entity. 

1614 

1615 """ 

1616 self.name = self._label = name 

1617 coerced_exprs = [ 

1618 coercions.expect( 

1619 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1620 ) 

1621 for expr in exprs 

1622 ] 

1623 self.exprs = coerced_exprs 

1624 

1625 self.c = self.columns = ColumnCollection( 

1626 (getattr(col, "key", col._label), col) 

1627 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1628 ).as_readonly() 

1629 self.single_entity = kw.pop("single_entity", self.single_entity) 

1630 

1631 def _gen_cache_key( 

1632 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1633 ) -> Tuple[Any, ...]: 

1634 return (self.__class__, self.name, self.single_entity) + tuple( 

1635 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1636 ) 

1637 

1638 @property 

1639 def mapper(self) -> Optional[Mapper[Any]]: 

1640 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1641 "parentmapper", None 

1642 ) 

1643 return mp 

1644 

1645 @property 

1646 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1647 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1648 0 

1649 ]._annotations.get("parententity", None) 

1650 return ie 

1651 

1652 @property 

1653 def entity_namespace( 

1654 self, 

1655 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1656 return self.c 

1657 

1658 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1659 

1660 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1661 

1662 e.g.:: 

1663 

1664 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1665 

1666 q = sess.query(bn).filter(bn.c.x == 5) 

1667 

1668 Nesting of bundles is also supported:: 

1669 

1670 b1 = Bundle("b1", 

1671 Bundle('b2', MyClass.a, MyClass.b), 

1672 Bundle('b3', MyClass.x, MyClass.y) 

1673 ) 

1674 

1675 q = sess.query(b1).filter( 

1676 b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1677 

1678 .. seealso:: 

1679 

1680 :attr:`.Bundle.c` 

1681 

1682 """ 

1683 

1684 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1685 """An alias for :attr:`.Bundle.columns`.""" 

1686 

1687 def _clone(self, **kw): 

1688 cloned = self.__class__.__new__(self.__class__) 

1689 cloned.__dict__.update(self.__dict__) 

1690 return cloned 

1691 

1692 def __clause_element__(self): 

1693 # ensure existing entity_namespace remains 

1694 annotations = {"bundle": self, "entity_namespace": self} 

1695 annotations.update(self._annotations) 

1696 

1697 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1698 "plugin_subject", self.entity 

1699 ) 

1700 return ( 

1701 expression.ClauseList( 

1702 _literal_as_text_role=roles.ColumnsClauseRole, 

1703 group=False, 

1704 *[e._annotations.get("bundle", e) for e in self.exprs], 

1705 ) 

1706 ._annotate(annotations) 

1707 ._set_propagate_attrs( 

1708 # the Bundle *must* use the orm plugin no matter what. the 

1709 # subject can be None but it's much better if it's not. 

1710 { 

1711 "compile_state_plugin": "orm", 

1712 "plugin_subject": plugin_subject, 

1713 } 

1714 ) 

1715 ) 

1716 

1717 @property 

1718 def clauses(self): 

1719 return self.__clause_element__().clauses 

1720 

1721 def label(self, name): 

1722 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1723 

1724 cloned = self._clone() 

1725 cloned.name = name 

1726 return cloned 

1727 

1728 def create_row_processor( 

1729 self, 

1730 query: Select[Unpack[TupleAny]], 

1731 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1732 labels: Sequence[str], 

1733 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1734 """Produce the "row processing" function for this :class:`.Bundle`. 

1735 

1736 May be overridden by subclasses to provide custom behaviors when 

1737 results are fetched. The method is passed the statement object and a 

1738 set of "row processor" functions at query execution time; these 

1739 processor functions when given a result row will return the individual 

1740 attribute value, which can then be adapted into any kind of return data 

1741 structure. 

1742 

1743 The example below illustrates replacing the usual :class:`.Row` 

1744 return structure with a straight Python dictionary:: 

1745 

1746 from sqlalchemy.orm import Bundle 

1747 

1748 class DictBundle(Bundle): 

1749 def create_row_processor(self, query, procs, labels): 

1750 'Override create_row_processor to return values as 

1751 dictionaries' 

1752 

1753 def proc(row): 

1754 return dict( 

1755 zip(labels, (proc(row) for proc in procs)) 

1756 ) 

1757 return proc 

1758 

1759 A result from the above :class:`_orm.Bundle` will return dictionary 

1760 values:: 

1761 

1762 bn = DictBundle('mybundle', MyClass.data1, MyClass.data2) 

1763 for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'): 

1764 print(row.mybundle['data1'], row.mybundle['data2']) 

1765 

1766 """ 

1767 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1768 

1769 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1770 return keyed_tuple([proc(row) for proc in procs]) 

1771 

1772 return proc 

1773 

1774 

1775def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: 

1776 """Deep copy the given ClauseElement, annotating each element with the 

1777 "_orm_adapt" flag. 

1778 

1779 Elements within the exclude collection will be cloned but not annotated. 

1780 

1781 """ 

1782 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) 

1783 

1784 

1785def _orm_deannotate(element: _SA) -> _SA: 

1786 """Remove annotations that link a column to a particular mapping. 

1787 

1788 Note this doesn't affect "remote" and "foreign" annotations 

1789 passed by the :func:`_orm.foreign` and :func:`_orm.remote` 

1790 annotators. 

1791 

1792 """ 

1793 

1794 return sql_util._deep_deannotate( 

1795 element, values=("_orm_adapt", "parententity") 

1796 ) 

1797 

1798 

1799def _orm_full_deannotate(element: _SA) -> _SA: 

1800 return sql_util._deep_deannotate(element) 

1801 

1802 

1803class _ORMJoin(expression.Join): 

1804 """Extend Join to support ORM constructs as input.""" 

1805 

1806 __visit_name__ = expression.Join.__visit_name__ 

1807 

1808 inherit_cache = True 

1809 

1810 def __init__( 

1811 self, 

1812 left: _FromClauseArgument, 

1813 right: _FromClauseArgument, 

1814 onclause: Optional[_OnClauseArgument] = None, 

1815 isouter: bool = False, 

1816 full: bool = False, 

1817 _left_memo: Optional[Any] = None, 

1818 _right_memo: Optional[Any] = None, 

1819 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1820 ): 

1821 left_info = cast( 

1822 "Union[FromClause, _InternalEntityType[Any]]", 

1823 inspection.inspect(left), 

1824 ) 

1825 

1826 right_info = cast( 

1827 "Union[FromClause, _InternalEntityType[Any]]", 

1828 inspection.inspect(right), 

1829 ) 

1830 adapt_to = right_info.selectable 

1831 

1832 # used by joined eager loader 

1833 self._left_memo = _left_memo 

1834 self._right_memo = _right_memo 

1835 

1836 if isinstance(onclause, attributes.QueryableAttribute): 

1837 if TYPE_CHECKING: 

1838 assert isinstance( 

1839 onclause.comparator, RelationshipProperty.Comparator 

1840 ) 

1841 on_selectable = onclause.comparator._source_selectable() 

1842 prop = onclause.property 

1843 _extra_criteria += onclause._extra_criteria 

1844 elif isinstance(onclause, MapperProperty): 

1845 # used internally by joined eager loader...possibly not ideal 

1846 prop = onclause 

1847 on_selectable = prop.parent.selectable 

1848 else: 

1849 prop = None 

1850 on_selectable = None 

1851 

1852 left_selectable = left_info.selectable 

1853 if prop: 

1854 adapt_from: Optional[FromClause] 

1855 if sql_util.clause_is_present(on_selectable, left_selectable): 

1856 adapt_from = on_selectable 

1857 else: 

1858 assert isinstance(left_selectable, FromClause) 

1859 adapt_from = left_selectable 

1860 

1861 ( 

1862 pj, 

1863 sj, 

1864 source, 

1865 dest, 

1866 secondary, 

1867 target_adapter, 

1868 ) = prop._create_joins( 

1869 source_selectable=adapt_from, 

1870 dest_selectable=adapt_to, 

1871 source_polymorphic=True, 

1872 of_type_entity=right_info, 

1873 alias_secondary=True, 

1874 extra_criteria=_extra_criteria, 

1875 ) 

1876 

1877 if sj is not None: 

1878 if isouter: 

1879 # note this is an inner join from secondary->right 

1880 right = sql.join(secondary, right, sj) 

1881 onclause = pj 

1882 else: 

1883 left = sql.join(left, secondary, pj, isouter) 

1884 onclause = sj 

1885 else: 

1886 onclause = pj 

1887 

1888 self._target_adapter = target_adapter 

1889 

1890 # we don't use the normal coercions logic for _ORMJoin 

1891 # (probably should), so do some gymnastics to get the entity. 

1892 # logic here is for #8721, which was a major bug in 1.4 

1893 # for almost two years, not reported/fixed until 1.4.43 (!) 

1894 if is_selectable(left_info): 

1895 parententity = left_selectable._annotations.get( 

1896 "parententity", None 

1897 ) 

1898 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1899 parententity = left_info 

1900 else: 

1901 parententity = None 

1902 

1903 if parententity is not None: 

1904 self._annotations = self._annotations.union( 

1905 {"parententity": parententity} 

1906 ) 

1907 

1908 augment_onclause = bool(_extra_criteria) and not prop 

1909 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1910 

1911 assert self.onclause is not None 

1912 

1913 if augment_onclause: 

1914 self.onclause &= sql.and_(*_extra_criteria) 

1915 

1916 if ( 

1917 not prop 

1918 and getattr(right_info, "mapper", None) 

1919 and right_info.mapper.single # type: ignore 

1920 ): 

1921 right_info = cast("_InternalEntityType[Any]", right_info) 

1922 # if single inheritance target and we are using a manual 

1923 # or implicit ON clause, augment it the same way we'd augment the 

1924 # WHERE. 

1925 single_crit = right_info.mapper._single_table_criterion 

1926 if single_crit is not None: 

1927 if insp_is_aliased_class(right_info): 

1928 single_crit = right_info._adapter.traverse(single_crit) 

1929 self.onclause = self.onclause & single_crit 

1930 

1931 def _splice_into_center(self, other): 

1932 """Splice a join into the center. 

1933 

1934 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1935 

1936 """ 

1937 leftmost = other 

1938 while isinstance(leftmost, sql.Join): 

1939 leftmost = leftmost.left 

1940 

1941 assert self.right is leftmost 

1942 

1943 left = _ORMJoin( 

1944 self.left, 

1945 other.left, 

1946 self.onclause, 

1947 isouter=self.isouter, 

1948 _left_memo=self._left_memo, 

1949 _right_memo=None, 

1950 ) 

1951 

1952 return _ORMJoin( 

1953 left, 

1954 other.right, 

1955 other.onclause, 

1956 isouter=other.isouter, 

1957 _right_memo=other._right_memo, 

1958 ) 

1959 

1960 def join( 

1961 self, 

1962 right: _FromClauseArgument, 

1963 onclause: Optional[_OnClauseArgument] = None, 

1964 isouter: bool = False, 

1965 full: bool = False, 

1966 ) -> _ORMJoin: 

1967 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1968 

1969 def outerjoin( 

1970 self, 

1971 right: _FromClauseArgument, 

1972 onclause: Optional[_OnClauseArgument] = None, 

1973 full: bool = False, 

1974 ) -> _ORMJoin: 

1975 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1976 

1977 

1978def with_parent( 

1979 instance: object, 

1980 prop: attributes.QueryableAttribute[Any], 

1981 from_entity: Optional[_EntityType[Any]] = None, 

1982) -> ColumnElement[bool]: 

1983 """Create filtering criterion that relates this query's primary entity 

1984 to the given related instance, using established 

1985 :func:`_orm.relationship()` 

1986 configuration. 

1987 

1988 E.g.:: 

1989 

1990 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1991 

1992 

1993 The SQL rendered is the same as that rendered when a lazy loader 

1994 would fire off from the given parent on that attribute, meaning 

1995 that the appropriate state is taken from the parent object in 

1996 Python without the need to render joins to the parent table 

1997 in the rendered statement. 

1998 

1999 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

2000 to indicate the left side of the criteria:: 

2001 

2002 

2003 a1 = aliased(Address) 

2004 a2 = aliased(Address) 

2005 stmt = select(a1, a2).where( 

2006 with_parent(u1, User.addresses.of_type(a2)) 

2007 ) 

2008 

2009 The above use is equivalent to using the 

2010 :func:`_orm.with_parent.from_entity` argument:: 

2011 

2012 a1 = aliased(Address) 

2013 a2 = aliased(Address) 

2014 stmt = select(a1, a2).where( 

2015 with_parent(u1, User.addresses, from_entity=a2) 

2016 ) 

2017 

2018 :param instance: 

2019 An instance which has some :func:`_orm.relationship`. 

2020 

2021 :param property: 

2022 Class-bound attribute, which indicates 

2023 what relationship from the instance should be used to reconcile the 

2024 parent/child relationship. 

2025 

2026 :param from_entity: 

2027 Entity in which to consider as the left side. This defaults to the 

2028 "zero" entity of the :class:`_query.Query` itself. 

2029 

2030 .. versionadded:: 1.2 

2031 

2032 """ 

2033 prop_t: RelationshipProperty[Any] 

2034 

2035 if isinstance(prop, str): 

2036 raise sa_exc.ArgumentError( 

2037 "with_parent() accepts class-bound mapped attributes, not strings" 

2038 ) 

2039 elif isinstance(prop, attributes.QueryableAttribute): 

2040 if prop._of_type: 

2041 from_entity = prop._of_type 

2042 mapper_property = prop.property 

2043 if mapper_property is None or not prop_is_relationship( 

2044 mapper_property 

2045 ): 

2046 raise sa_exc.ArgumentError( 

2047 f"Expected relationship property for with_parent(), " 

2048 f"got {mapper_property}" 

2049 ) 

2050 prop_t = mapper_property 

2051 else: 

2052 prop_t = prop 

2053 

2054 return prop_t._with_parent(instance, from_entity=from_entity) 

2055 

2056 

2057def has_identity(object_: object) -> bool: 

2058 """Return True if the given object has a database 

2059 identity. 

2060 

2061 This typically corresponds to the object being 

2062 in either the persistent or detached state. 

2063 

2064 .. seealso:: 

2065 

2066 :func:`.was_deleted` 

2067 

2068 """ 

2069 state = attributes.instance_state(object_) 

2070 return state.has_identity 

2071 

2072 

2073def was_deleted(object_: object) -> bool: 

2074 """Return True if the given object was deleted 

2075 within a session flush. 

2076 

2077 This is regardless of whether or not the object is 

2078 persistent or detached. 

2079 

2080 .. seealso:: 

2081 

2082 :attr:`.InstanceState.was_deleted` 

2083 

2084 """ 

2085 

2086 state = attributes.instance_state(object_) 

2087 return state.was_deleted 

2088 

2089 

2090def _entity_corresponds_to( 

2091 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2092) -> bool: 

2093 """determine if 'given' corresponds to 'entity', in terms 

2094 of an entity passed to Query that would match the same entity 

2095 being referred to elsewhere in the query. 

2096 

2097 """ 

2098 if insp_is_aliased_class(entity): 

2099 if insp_is_aliased_class(given): 

2100 if entity._base_alias() is given._base_alias(): 

2101 return True 

2102 return False 

2103 elif insp_is_aliased_class(given): 

2104 if given._use_mapper_path: 

2105 return entity in given.with_polymorphic_mappers 

2106 else: 

2107 return entity is given 

2108 

2109 assert insp_is_mapper(given) 

2110 return entity.common_parent(given) 

2111 

2112 

2113def _entity_corresponds_to_use_path_impl( 

2114 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2115) -> bool: 

2116 """determine if 'given' corresponds to 'entity', in terms 

2117 of a path of loader options where a mapped attribute is taken to 

2118 be a member of a parent entity. 

2119 

2120 e.g.:: 

2121 

2122 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2123 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2124 

2125 a1 = aliased(A) 

2126 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2127 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2128 

2129 wp = with_polymorphic(A, [A1, A2]) 

2130 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2131 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2132 

2133 

2134 """ 

2135 if insp_is_aliased_class(given): 

2136 return ( 

2137 insp_is_aliased_class(entity) 

2138 and not entity._use_mapper_path 

2139 and (given is entity or entity in given._with_polymorphic_entities) 

2140 ) 

2141 elif not insp_is_aliased_class(entity): 

2142 return given.isa(entity.mapper) 

2143 else: 

2144 return ( 

2145 entity._use_mapper_path 

2146 and given in entity.with_polymorphic_mappers 

2147 ) 

2148 

2149 

2150def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2151 """determine if 'given' "is a" mapper, in terms of the given 

2152 would load rows of type 'mapper'. 

2153 

2154 """ 

2155 if given.is_aliased_class: 

2156 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2157 mapper 

2158 ) 

2159 elif given.with_polymorphic_mappers: 

2160 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2161 else: 

2162 return given.isa(mapper) 

2163 

2164 

2165def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2166 """calculate __getitem__ in terms of an iterable query object 

2167 that also has a slice() method. 

2168 

2169 """ 

2170 

2171 def _no_negative_indexes(): 

2172 raise IndexError( 

2173 "negative indexes are not accepted by SQL " 

2174 "index / slice operators" 

2175 ) 

2176 

2177 if isinstance(item, slice): 

2178 start, stop, step = util.decode_slice(item) 

2179 

2180 if ( 

2181 isinstance(stop, int) 

2182 and isinstance(start, int) 

2183 and stop - start <= 0 

2184 ): 

2185 return [] 

2186 

2187 elif (isinstance(start, int) and start < 0) or ( 

2188 isinstance(stop, int) and stop < 0 

2189 ): 

2190 _no_negative_indexes() 

2191 

2192 res = iterable_query.slice(start, stop) 

2193 if step is not None: 

2194 return list(res)[None : None : item.step] 

2195 else: 

2196 return list(res) 

2197 else: 

2198 if item == -1: 

2199 _no_negative_indexes() 

2200 else: 

2201 return list(iterable_query[item : item + 1])[0] 

2202 

2203 

2204def _is_mapped_annotation( 

2205 raw_annotation: _AnnotationScanType, 

2206 cls: Type[Any], 

2207 originating_cls: Type[Any], 

2208) -> bool: 

2209 try: 

2210 annotated = de_stringify_annotation( 

2211 cls, raw_annotation, originating_cls.__module__ 

2212 ) 

2213 except NameError: 

2214 # in most cases, at least within our own tests, we can raise 

2215 # here, which is more accurate as it prevents us from returning 

2216 # false negatives. However, in the real world, try to avoid getting 

2217 # involved with end-user annotations that have nothing to do with us. 

2218 # see issue #8888 where we bypass using this function in the case 

2219 # that we want to detect an unresolvable Mapped[] type. 

2220 return False 

2221 else: 

2222 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2223 

2224 

2225class _CleanupError(Exception): 

2226 pass 

2227 

2228 

2229def _cleanup_mapped_str_annotation( 

2230 annotation: str, originating_module: str 

2231) -> str: 

2232 # fix up an annotation that comes in as the form: 

2233 # 'Mapped[List[Address]]' so that it instead looks like: 

2234 # 'Mapped[List["Address"]]' , which will allow us to get 

2235 # "Address" as a string 

2236 

2237 # additionally, resolve symbols for these names since this is where 

2238 # we'd have to do it 

2239 

2240 inner: Optional[Match[str]] 

2241 

2242 mm = re.match(r"^(.+?)\[(.+)\]$", annotation) 

2243 

2244 if not mm: 

2245 return annotation 

2246 

2247 # ticket #8759. Resolve the Mapped name to a real symbol. 

2248 # originally this just checked the name. 

2249 try: 

2250 obj = eval_name_only(mm.group(1), originating_module) 

2251 except NameError as ne: 

2252 raise _CleanupError( 

2253 f'For annotation "{annotation}", could not resolve ' 

2254 f'container type "{mm.group(1)}". ' 

2255 "Please ensure this type is imported at the module level " 

2256 "outside of TYPE_CHECKING blocks" 

2257 ) from ne 

2258 

2259 if obj is typing.ClassVar: 

2260 real_symbol = "ClassVar" 

2261 else: 

2262 try: 

2263 if issubclass(obj, _MappedAnnotationBase): 

2264 real_symbol = obj.__name__ 

2265 else: 

2266 return annotation 

2267 except TypeError: 

2268 # avoid isinstance(obj, type) check, just catch TypeError 

2269 return annotation 

2270 

2271 # note: if one of the codepaths above didn't define real_symbol and 

2272 # then didn't return, real_symbol raises UnboundLocalError 

2273 # which is actually a NameError, and the calling routines don't 

2274 # notice this since they are catching NameError anyway. Just in case 

2275 # this is being modified in the future, something to be aware of. 

2276 

2277 stack = [] 

2278 inner = mm 

2279 while True: 

2280 stack.append(real_symbol if mm is inner else inner.group(1)) 

2281 g2 = inner.group(2) 

2282 inner = re.match(r"^(.+?)\[(.+)\]$", g2) 

2283 if inner is None: 

2284 stack.append(g2) 

2285 break 

2286 

2287 # stacks we want to rewrite, that is, quote the last entry which 

2288 # we think is a relationship class name: 

2289 # 

2290 # ['Mapped', 'List', 'Address'] 

2291 # ['Mapped', 'A'] 

2292 # 

2293 # stacks we dont want to rewrite, which are generally MappedColumn 

2294 # use cases: 

2295 # 

2296 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2297 # ['Mapped', 'dict[str, str] | None'] 

2298 

2299 if ( 

2300 # avoid already quoted symbols such as 

2301 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2302 not re.match(r"""^["'].*["']$""", stack[-1]) 

2303 # avoid further generics like Dict[] such as 

2304 # ['Mapped', 'dict[str, str] | None'] 

2305 and not re.match(r".*\[.*\]", stack[-1]) 

2306 ): 

2307 stripchars = "\"' " 

2308 stack[-1] = ", ".join( 

2309 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2310 ) 

2311 

2312 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2313 

2314 return annotation 

2315 

2316 

2317def _extract_mapped_subtype( 

2318 raw_annotation: Optional[_AnnotationScanType], 

2319 cls: type, 

2320 originating_module: str, 

2321 key: str, 

2322 attr_cls: Type[Any], 

2323 required: bool, 

2324 is_dataclass_field: bool, 

2325 expect_mapped: bool = True, 

2326 raiseerr: bool = True, 

2327) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]: 

2328 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2329 so, return the ``something`` part. 

2330 

2331 Includes error raise scenarios and other options. 

2332 

2333 """ 

2334 

2335 if raw_annotation is None: 

2336 if required: 

2337 raise sa_exc.ArgumentError( 

2338 f"Python typing annotation is required for attribute " 

2339 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2340 f'"{attr_cls.__name__}" construct are None or not present' 

2341 ) 

2342 return None 

2343 

2344 try: 

2345 annotated = de_stringify_annotation( 

2346 cls, 

2347 raw_annotation, 

2348 originating_module, 

2349 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2350 ) 

2351 except _CleanupError as ce: 

2352 raise sa_exc.ArgumentError( 

2353 f"Could not interpret annotation {raw_annotation}. " 

2354 "Check that it uses names that are correctly imported at the " 

2355 "module level. See chained stack trace for more hints." 

2356 ) from ce 

2357 except NameError as ne: 

2358 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2359 raise sa_exc.ArgumentError( 

2360 f"Could not interpret annotation {raw_annotation}. " 

2361 "Check that it uses names that are correctly imported at the " 

2362 "module level. See chained stack trace for more hints." 

2363 ) from ne 

2364 

2365 annotated = raw_annotation # type: ignore 

2366 

2367 if is_dataclass_field: 

2368 return annotated, None 

2369 else: 

2370 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2371 annotated, _MappedAnnotationBase 

2372 ): 

2373 if expect_mapped: 

2374 if not raiseerr: 

2375 return None 

2376 

2377 origin = getattr(annotated, "__origin__", None) 

2378 if origin is typing.ClassVar: 

2379 return None 

2380 

2381 # check for other kind of ORM descriptor like AssociationProxy, 

2382 # don't raise for that (issue #9957) 

2383 elif isinstance(origin, type) and issubclass( 

2384 origin, ORMDescriptor 

2385 ): 

2386 return None 

2387 

2388 raise sa_exc.ArgumentError( 

2389 f'Type annotation for "{cls.__name__}.{key}" ' 

2390 "can't be correctly interpreted for " 

2391 "Annotated Declarative Table form. ORM annotations " 

2392 "should normally make use of the ``Mapped[]`` generic " 

2393 "type, or other ORM-compatible generic type, as a " 

2394 "container for the actual type, which indicates the " 

2395 "intent that the attribute is mapped. " 

2396 "Class variables that are not intended to be mapped " 

2397 "by the ORM should use ClassVar[]. " 

2398 "To allow Annotated Declarative to disregard legacy " 

2399 "annotations which don't use Mapped[] to pass, set " 

2400 '"__allow_unmapped__ = True" on the class or a ' 

2401 "superclass this class.", 

2402 code="zlpr", 

2403 ) 

2404 

2405 else: 

2406 return annotated, None 

2407 

2408 if len(annotated.__args__) != 1: 

2409 raise sa_exc.ArgumentError( 

2410 "Expected sub-type for Mapped[] annotation" 

2411 ) 

2412 

2413 return ( 

2414 # fix dict/list/set args to be ForwardRef, see #11814 

2415 fixup_container_fwd_refs(annotated.__args__[0]), 

2416 annotated.__origin__, 

2417 ) 

2418 

2419 

2420def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2421 if hasattr(prop, "_mapper_property_name"): 

2422 name = prop._mapper_property_name() 

2423 else: 

2424 name = None 

2425 return util.clsname_as_plain_name(prop, name)