Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/util.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

813 statements  

1# orm/util.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import enum 

12import functools 

13import re 

14import types 

15import typing 

16from typing import AbstractSet 

17from typing import Any 

18from typing import Callable 

19from typing import cast 

20from typing import Dict 

21from typing import FrozenSet 

22from typing import Generic 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Match 

27from typing import Optional 

28from typing import Protocol 

29from typing import Sequence 

30from typing import Tuple 

31from typing import Type 

32from typing import TYPE_CHECKING 

33from typing import TypeVar 

34from typing import Union 

35import weakref 

36 

37from . import attributes # noqa 

38from . import exc 

39from ._typing import _O 

40from ._typing import insp_is_aliased_class 

41from ._typing import insp_is_mapper 

42from ._typing import prop_is_relationship 

43from .base import _class_to_mapper as _class_to_mapper 

44from .base import _MappedAnnotationBase 

45from .base import _never_set as _never_set # noqa: F401 

46from .base import _none_set as _none_set # noqa: F401 

47from .base import attribute_str as attribute_str # noqa: F401 

48from .base import class_mapper as class_mapper 

49from .base import DynamicMapped 

50from .base import InspectionAttr as InspectionAttr 

51from .base import instance_str as instance_str # noqa: F401 

52from .base import Mapped 

53from .base import object_mapper as object_mapper 

54from .base import object_state as object_state # noqa: F401 

55from .base import opt_manager_of_class 

56from .base import ORMDescriptor 

57from .base import state_attribute_str as state_attribute_str # noqa: F401 

58from .base import state_class_str as state_class_str # noqa: F401 

59from .base import state_str as state_str # noqa: F401 

60from .base import WriteOnlyMapped 

61from .interfaces import CriteriaOption 

62from .interfaces import MapperProperty as MapperProperty 

63from .interfaces import ORMColumnsClauseRole 

64from .interfaces import ORMEntityColumnsClauseRole 

65from .interfaces import ORMFromClauseRole 

66from .path_registry import PathRegistry as PathRegistry 

67from .. import event 

68from .. import exc as sa_exc 

69from .. import inspection 

70from .. import sql 

71from .. import util 

72from ..engine.result import result_tuple 

73from ..sql import coercions 

74from ..sql import expression 

75from ..sql import lambdas 

76from ..sql import roles 

77from ..sql import util as sql_util 

78from ..sql import visitors 

79from ..sql._typing import is_selectable 

80from ..sql.annotation import SupportsCloneAnnotations 

81from ..sql.base import ColumnCollection 

82from ..sql.cache_key import HasCacheKey 

83from ..sql.cache_key import MemoizedHasCacheKey 

84from ..sql.elements import ColumnElement 

85from ..sql.elements import KeyedColumnElement 

86from ..sql.selectable import FromClause 

87from ..util.langhelpers import MemoizedSlots 

88from ..util.typing import de_stringify_annotation as _de_stringify_annotation 

89from ..util.typing import ( 

90 de_stringify_union_elements as _de_stringify_union_elements, 

91) 

92from ..util.typing import eval_name_only as _eval_name_only 

93from ..util.typing import is_origin_of_cls 

94from ..util.typing import Literal 

95from ..util.typing import TupleAny 

96from ..util.typing import typing_get_origin 

97from ..util.typing import Unpack 

98 

99if typing.TYPE_CHECKING: 

100 from ._typing import _EntityType 

101 from ._typing import _IdentityKeyType 

102 from ._typing import _InternalEntityType 

103 from ._typing import _ORMCOLEXPR 

104 from .context import _MapperEntity 

105 from .context import ORMCompileState 

106 from .mapper import Mapper 

107 from .path_registry import AbstractEntityRegistry 

108 from .query import Query 

109 from .relationships import RelationshipProperty 

110 from ..engine import Row 

111 from ..engine import RowMapping 

112 from ..sql._typing import _CE 

113 from ..sql._typing import _ColumnExpressionArgument 

114 from ..sql._typing import _EquivalentColumnMap 

115 from ..sql._typing import _FromClauseArgument 

116 from ..sql._typing import _OnClauseArgument 

117 from ..sql._typing import _PropagateAttrsType 

118 from ..sql.annotation import _SA 

119 from ..sql.base import ReadOnlyColumnCollection 

120 from ..sql.elements import BindParameter 

121 from ..sql.selectable import _ColumnsClauseElement 

122 from ..sql.selectable import Select 

123 from ..sql.selectable import Selectable 

124 from ..sql.visitors import anon_map 

125 from ..util.typing import _AnnotationScanType 

126 from ..util.typing import ArgsTypeProcotol 

127 

128_T = TypeVar("_T", bound=Any) 

129 

130all_cascades = frozenset( 

131 ( 

132 "delete", 

133 "delete-orphan", 

134 "all", 

135 "merge", 

136 "expunge", 

137 "save-update", 

138 "refresh-expire", 

139 "none", 

140 ) 

141) 

142 

143 

144_de_stringify_partial = functools.partial( 

145 functools.partial, 

146 locals_=util.immutabledict( 

147 { 

148 "Mapped": Mapped, 

149 "WriteOnlyMapped": WriteOnlyMapped, 

150 "DynamicMapped": DynamicMapped, 

151 } 

152 ), 

153) 

154 

155# partial is practically useless as we have to write out the whole 

156# function and maintain the signature anyway 

157 

158 

159class _DeStringifyAnnotation(Protocol): 

160 def __call__( 

161 self, 

162 cls: Type[Any], 

163 annotation: _AnnotationScanType, 

164 originating_module: str, 

165 *, 

166 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

167 include_generic: bool = False, 

168 ) -> Type[Any]: ... 

169 

170 

171de_stringify_annotation = cast( 

172 _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) 

173) 

174 

175 

176class _DeStringifyUnionElements(Protocol): 

177 def __call__( 

178 self, 

179 cls: Type[Any], 

180 annotation: ArgsTypeProcotol, 

181 originating_module: str, 

182 *, 

183 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

184 ) -> Type[Any]: ... 

185 

186 

187de_stringify_union_elements = cast( 

188 _DeStringifyUnionElements, 

189 _de_stringify_partial(_de_stringify_union_elements), 

190) 

191 

192 

193class _EvalNameOnly(Protocol): 

194 def __call__(self, name: str, module_name: str) -> Any: ... 

195 

196 

197eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) 

198 

199 

200class CascadeOptions(FrozenSet[str]): 

201 """Keeps track of the options sent to 

202 :paramref:`.relationship.cascade`""" 

203 

204 _add_w_all_cascades = all_cascades.difference( 

205 ["all", "none", "delete-orphan"] 

206 ) 

207 _allowed_cascades = all_cascades 

208 

209 _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] 

210 

211 __slots__ = ( 

212 "save_update", 

213 "delete", 

214 "refresh_expire", 

215 "merge", 

216 "expunge", 

217 "delete_orphan", 

218 ) 

219 

220 save_update: bool 

221 delete: bool 

222 refresh_expire: bool 

223 merge: bool 

224 expunge: bool 

225 delete_orphan: bool 

226 

227 def __new__( 

228 cls, value_list: Optional[Union[Iterable[str], str]] 

229 ) -> CascadeOptions: 

230 if isinstance(value_list, str) or value_list is None: 

231 return cls.from_string(value_list) # type: ignore 

232 values = set(value_list) 

233 if values.difference(cls._allowed_cascades): 

234 raise sa_exc.ArgumentError( 

235 "Invalid cascade option(s): %s" 

236 % ", ".join( 

237 [ 

238 repr(x) 

239 for x in sorted( 

240 values.difference(cls._allowed_cascades) 

241 ) 

242 ] 

243 ) 

244 ) 

245 

246 if "all" in values: 

247 values.update(cls._add_w_all_cascades) 

248 if "none" in values: 

249 values.clear() 

250 values.discard("all") 

251 

252 self = super().__new__(cls, values) 

253 self.save_update = "save-update" in values 

254 self.delete = "delete" in values 

255 self.refresh_expire = "refresh-expire" in values 

256 self.merge = "merge" in values 

257 self.expunge = "expunge" in values 

258 self.delete_orphan = "delete-orphan" in values 

259 

260 if self.delete_orphan and not self.delete: 

261 util.warn("The 'delete-orphan' cascade option requires 'delete'.") 

262 return self 

263 

264 def __repr__(self): 

265 return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) 

266 

267 @classmethod 

268 def from_string(cls, arg): 

269 values = [c for c in re.split(r"\s*,\s*", arg or "") if c] 

270 return cls(values) 

271 

272 

273def _validator_events(desc, key, validator, include_removes, include_backrefs): 

274 """Runs a validation method on an attribute value to be set or 

275 appended. 

276 """ 

277 

278 if not include_backrefs: 

279 

280 def detect_is_backref(state, initiator): 

281 impl = state.manager[key].impl 

282 return initiator.impl is not impl 

283 

284 if include_removes: 

285 

286 def append(state, value, initiator): 

287 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

288 include_backrefs or not detect_is_backref(state, initiator) 

289 ): 

290 return validator(state.obj(), key, value, False) 

291 else: 

292 return value 

293 

294 def bulk_set(state, values, initiator): 

295 if include_backrefs or not detect_is_backref(state, initiator): 

296 obj = state.obj() 

297 values[:] = [ 

298 validator(obj, key, value, False) for value in values 

299 ] 

300 

301 def set_(state, value, oldvalue, initiator): 

302 if include_backrefs or not detect_is_backref(state, initiator): 

303 return validator(state.obj(), key, value, False) 

304 else: 

305 return value 

306 

307 def remove(state, value, initiator): 

308 if include_backrefs or not detect_is_backref(state, initiator): 

309 validator(state.obj(), key, value, True) 

310 

311 else: 

312 

313 def append(state, value, initiator): 

314 if initiator.op is not attributes.OP_BULK_REPLACE and ( 

315 include_backrefs or not detect_is_backref(state, initiator) 

316 ): 

317 return validator(state.obj(), key, value) 

318 else: 

319 return value 

320 

321 def bulk_set(state, values, initiator): 

322 if include_backrefs or not detect_is_backref(state, initiator): 

323 obj = state.obj() 

324 values[:] = [validator(obj, key, value) for value in values] 

325 

326 def set_(state, value, oldvalue, initiator): 

327 if include_backrefs or not detect_is_backref(state, initiator): 

328 return validator(state.obj(), key, value) 

329 else: 

330 return value 

331 

332 event.listen(desc, "append", append, raw=True, retval=True) 

333 event.listen(desc, "bulk_replace", bulk_set, raw=True) 

334 event.listen(desc, "set", set_, raw=True, retval=True) 

335 if include_removes: 

336 event.listen(desc, "remove", remove, raw=True, retval=True) 

337 

338 

339def polymorphic_union( 

340 table_map, typecolname, aliasname="p_union", cast_nulls=True 

341): 

342 """Create a ``UNION`` statement used by a polymorphic mapper. 

343 

344 See :ref:`concrete_inheritance` for an example of how 

345 this is used. 

346 

347 :param table_map: mapping of polymorphic identities to 

348 :class:`_schema.Table` objects. 

349 :param typecolname: string name of a "discriminator" column, which will be 

350 derived from the query, producing the polymorphic identity for 

351 each row. If ``None``, no polymorphic discriminator is generated. 

352 :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` 

353 construct generated. 

354 :param cast_nulls: if True, non-existent columns, which are represented 

355 as labeled NULLs, will be passed into CAST. This is a legacy behavior 

356 that is problematic on some backends such as Oracle - in which case it 

357 can be set to False. 

358 

359 """ 

360 

361 colnames: util.OrderedSet[str] = util.OrderedSet() 

362 colnamemaps = {} 

363 types = {} 

364 for key in table_map: 

365 table = table_map[key] 

366 

367 table = coercions.expect( 

368 roles.StrictFromClauseRole, table, allow_select=True 

369 ) 

370 table_map[key] = table 

371 

372 m = {} 

373 for c in table.c: 

374 if c.key == typecolname: 

375 raise sa_exc.InvalidRequestError( 

376 "Polymorphic union can't use '%s' as the discriminator " 

377 "column due to mapped column %r; please apply the " 

378 "'typecolname' " 

379 "argument; this is available on " 

380 "ConcreteBase as '_concrete_discriminator_name'" 

381 % (typecolname, c) 

382 ) 

383 colnames.add(c.key) 

384 m[c.key] = c 

385 types[c.key] = c.type 

386 colnamemaps[table] = m 

387 

388 def col(name, table): 

389 try: 

390 return colnamemaps[table][name] 

391 except KeyError: 

392 if cast_nulls: 

393 return sql.cast(sql.null(), types[name]).label(name) 

394 else: 

395 return sql.type_coerce(sql.null(), types[name]).label(name) 

396 

397 result = [] 

398 for type_, table in table_map.items(): 

399 if typecolname is not None: 

400 result.append( 

401 sql.select( 

402 *( 

403 [col(name, table) for name in colnames] 

404 + [ 

405 sql.literal_column( 

406 sql_util._quote_ddl_expr(type_) 

407 ).label(typecolname) 

408 ] 

409 ) 

410 ).select_from(table) 

411 ) 

412 else: 

413 result.append( 

414 sql.select( 

415 *[col(name, table) for name in colnames] 

416 ).select_from(table) 

417 ) 

418 return sql.union_all(*result).alias(aliasname) 

419 

420 

421def identity_key( 

422 class_: Optional[Type[_T]] = None, 

423 ident: Union[Any, Tuple[Any, ...]] = None, 

424 *, 

425 instance: Optional[_T] = None, 

426 row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None, 

427 identity_token: Optional[Any] = None, 

428) -> _IdentityKeyType[_T]: 

429 r"""Generate "identity key" tuples, as are used as keys in the 

430 :attr:`.Session.identity_map` dictionary. 

431 

432 This function has several call styles: 

433 

434 * ``identity_key(class, ident, identity_token=token)`` 

435 

436 This form receives a mapped class and a primary key scalar or 

437 tuple as an argument. 

438 

439 E.g.:: 

440 

441 >>> identity_key(MyClass, (1, 2)) 

442 (<class '__main__.MyClass'>, (1, 2), None) 

443 

444 :param class: mapped class (must be a positional argument) 

445 :param ident: primary key, may be a scalar or tuple argument. 

446 :param identity_token: optional identity token 

447 

448 .. versionadded:: 1.2 added identity_token 

449 

450 

451 * ``identity_key(instance=instance)`` 

452 

453 This form will produce the identity key for a given instance. The 

454 instance need not be persistent, only that its primary key attributes 

455 are populated (else the key will contain ``None`` for those missing 

456 values). 

457 

458 E.g.:: 

459 

460 >>> instance = MyClass(1, 2) 

461 >>> identity_key(instance=instance) 

462 (<class '__main__.MyClass'>, (1, 2), None) 

463 

464 In this form, the given instance is ultimately run though 

465 :meth:`_orm.Mapper.identity_key_from_instance`, which will have the 

466 effect of performing a database check for the corresponding row 

467 if the object is expired. 

468 

469 :param instance: object instance (must be given as a keyword arg) 

470 

471 * ``identity_key(class, row=row, identity_token=token)`` 

472 

473 This form is similar to the class/tuple form, except is passed a 

474 database result row as a :class:`.Row` or :class:`.RowMapping` object. 

475 

476 E.g.:: 

477 

478 >>> row = engine.execute(\ 

479 text("select * from table where a=1 and b=2")\ 

480 ).first() 

481 >>> identity_key(MyClass, row=row) 

482 (<class '__main__.MyClass'>, (1, 2), None) 

483 

484 :param class: mapped class (must be a positional argument) 

485 :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` 

486 (must be given as a keyword arg) 

487 :param identity_token: optional identity token 

488 

489 .. versionadded:: 1.2 added identity_token 

490 

491 """ 

492 if class_ is not None: 

493 mapper = class_mapper(class_) 

494 if row is None: 

495 if ident is None: 

496 raise sa_exc.ArgumentError("ident or row is required") 

497 return mapper.identity_key_from_primary_key( 

498 tuple(util.to_list(ident)), identity_token=identity_token 

499 ) 

500 else: 

501 return mapper.identity_key_from_row( 

502 row, identity_token=identity_token 

503 ) 

504 elif instance is not None: 

505 mapper = object_mapper(instance) 

506 return mapper.identity_key_from_instance(instance) 

507 else: 

508 raise sa_exc.ArgumentError("class or instance is required") 

509 

510 

511class _TraceAdaptRole(enum.Enum): 

512 """Enumeration of all the use cases for ORMAdapter. 

513 

514 ORMAdapter remains one of the most complicated aspects of the ORM, as it is 

515 used for in-place adaption of column expressions to be applied to a SELECT, 

516 replacing :class:`.Table` and other objects that are mapped to classes with 

517 aliases of those tables in the case of joined eager loading, or in the case 

518 of polymorphic loading as used with concrete mappings or other custom "with 

519 polymorphic" parameters, with whole user-defined subqueries. The 

520 enumerations provide an overview of all the use cases used by ORMAdapter, a 

521 layer of formality as to the introduction of new ORMAdapter use cases (of 

522 which none are anticipated), as well as a means to trace the origins of a 

523 particular ORMAdapter within runtime debugging. 

524 

525 SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on 

526 open-ended statement adaption, including the ``Query.with_polymorphic()`` 

527 method and the ``Query.select_from_entity()`` methods, favoring 

528 user-explicit aliasing schemes using the ``aliased()`` and 

529 ``with_polymorphic()`` standalone constructs; these still use adaption, 

530 however the adaption is applied in a narrower scope. 

531 

532 """ 

533 

534 # aliased() use that is used to adapt individual attributes at query 

535 # construction time 

536 ALIASED_INSP = enum.auto() 

537 

538 # joinedload cases; typically adapt an ON clause of a relationship 

539 # join 

540 JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() 

541 JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() 

542 JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() 

543 

544 # polymorphic cases - these are complex ones that replace FROM 

545 # clauses, replacing tables with subqueries 

546 MAPPER_POLYMORPHIC_ADAPTER = enum.auto() 

547 WITH_POLYMORPHIC_ADAPTER = enum.auto() 

548 WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() 

549 DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() 

550 

551 # the from_statement() case, used only to adapt individual attributes 

552 # from a given statement to local ORM attributes at result fetching 

553 # time. assigned to ORMCompileState._from_obj_alias 

554 ADAPT_FROM_STATEMENT = enum.auto() 

555 

556 # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; 

557 # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., 

558 # joinedloads are then placed on the outside. 

559 # assigned to ORMCompileState.compound_eager_adapter 

560 COMPOUND_EAGER_STATEMENT = enum.auto() 

561 

562 # the legacy Query._set_select_from() case. 

563 # this is needed for Query's set operations (i.e. UNION, etc. ) 

564 # as well as "legacy from_self()", which while removed from 2.0 as 

565 # public API, is used for the Query.count() method. this one 

566 # still does full statement traversal 

567 # assigned to ORMCompileState._from_obj_alias 

568 LEGACY_SELECT_FROM_ALIAS = enum.auto() 

569 

570 

571class ORMStatementAdapter(sql_util.ColumnAdapter): 

572 """ColumnAdapter which includes a role attribute.""" 

573 

574 __slots__ = ("role",) 

575 

576 def __init__( 

577 self, 

578 role: _TraceAdaptRole, 

579 selectable: Selectable, 

580 *, 

581 equivalents: Optional[_EquivalentColumnMap] = None, 

582 adapt_required: bool = False, 

583 allow_label_resolve: bool = True, 

584 anonymize_labels: bool = False, 

585 adapt_on_names: bool = False, 

586 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

587 ): 

588 self.role = role 

589 super().__init__( 

590 selectable, 

591 equivalents=equivalents, 

592 adapt_required=adapt_required, 

593 allow_label_resolve=allow_label_resolve, 

594 anonymize_labels=anonymize_labels, 

595 adapt_on_names=adapt_on_names, 

596 adapt_from_selectables=adapt_from_selectables, 

597 ) 

598 

599 

600class ORMAdapter(sql_util.ColumnAdapter): 

601 """ColumnAdapter subclass which excludes adaptation of entities from 

602 non-matching mappers. 

603 

604 """ 

605 

606 __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") 

607 

608 is_aliased_class: bool 

609 aliased_insp: Optional[AliasedInsp[Any]] 

610 

611 def __init__( 

612 self, 

613 role: _TraceAdaptRole, 

614 entity: _InternalEntityType[Any], 

615 *, 

616 equivalents: Optional[_EquivalentColumnMap] = None, 

617 adapt_required: bool = False, 

618 allow_label_resolve: bool = True, 

619 anonymize_labels: bool = False, 

620 selectable: Optional[Selectable] = None, 

621 limit_on_entity: bool = True, 

622 adapt_on_names: bool = False, 

623 adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, 

624 ): 

625 self.role = role 

626 self.mapper = entity.mapper 

627 if selectable is None: 

628 selectable = entity.selectable 

629 if insp_is_aliased_class(entity): 

630 self.is_aliased_class = True 

631 self.aliased_insp = entity 

632 else: 

633 self.is_aliased_class = False 

634 self.aliased_insp = None 

635 

636 super().__init__( 

637 selectable, 

638 equivalents, 

639 adapt_required=adapt_required, 

640 allow_label_resolve=allow_label_resolve, 

641 anonymize_labels=anonymize_labels, 

642 include_fn=self._include_fn if limit_on_entity else None, 

643 adapt_on_names=adapt_on_names, 

644 adapt_from_selectables=adapt_from_selectables, 

645 ) 

646 

647 def _include_fn(self, elem): 

648 entity = elem._annotations.get("parentmapper", None) 

649 

650 return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) 

651 

652 

653class AliasedClass( 

654 inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] 

655): 

656 r"""Represents an "aliased" form of a mapped class for usage with Query. 

657 

658 The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` 

659 construct, this object mimics the mapped class using a 

660 ``__getattr__`` scheme and maintains a reference to a 

661 real :class:`~sqlalchemy.sql.expression.Alias` object. 

662 

663 A primary purpose of :class:`.AliasedClass` is to serve as an alternate 

664 within a SQL statement generated by the ORM, such that an existing 

665 mapped entity can be used in multiple contexts. A simple example:: 

666 

667 # find all pairs of users with the same name 

668 user_alias = aliased(User) 

669 session.query(User, user_alias).\ 

670 join((user_alias, User.id > user_alias.id)).\ 

671 filter(User.name == user_alias.name) 

672 

673 :class:`.AliasedClass` is also capable of mapping an existing mapped 

674 class to an entirely new selectable, provided this selectable is column- 

675 compatible with the existing mapped selectable, and it can also be 

676 configured in a mapping as the target of a :func:`_orm.relationship`. 

677 See the links below for examples. 

678 

679 The :class:`.AliasedClass` object is constructed typically using the 

680 :func:`_orm.aliased` function. It also is produced with additional 

681 configuration when using the :func:`_orm.with_polymorphic` function. 

682 

683 The resulting object is an instance of :class:`.AliasedClass`. 

684 This object implements an attribute scheme which produces the 

685 same attribute and method interface as the original mapped 

686 class, allowing :class:`.AliasedClass` to be compatible 

687 with any attribute technique which works on the original class, 

688 including hybrid attributes (see :ref:`hybrids_toplevel`). 

689 

690 The :class:`.AliasedClass` can be inspected for its underlying 

691 :class:`_orm.Mapper`, aliased selectable, and other information 

692 using :func:`_sa.inspect`:: 

693 

694 from sqlalchemy import inspect 

695 my_alias = aliased(MyClass) 

696 insp = inspect(my_alias) 

697 

698 The resulting inspection object is an instance of :class:`.AliasedInsp`. 

699 

700 

701 .. seealso:: 

702 

703 :func:`.aliased` 

704 

705 :func:`.with_polymorphic` 

706 

707 :ref:`relationship_aliased_class` 

708 

709 :ref:`relationship_to_window_function` 

710 

711 

712 """ 

713 

714 __name__: str 

715 

716 def __init__( 

717 self, 

718 mapped_class_or_ac: _EntityType[_O], 

719 alias: Optional[FromClause] = None, 

720 name: Optional[str] = None, 

721 flat: bool = False, 

722 adapt_on_names: bool = False, 

723 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, 

724 with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, 

725 base_alias: Optional[AliasedInsp[Any]] = None, 

726 use_mapper_path: bool = False, 

727 represents_outer_join: bool = False, 

728 ): 

729 insp = cast( 

730 "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) 

731 ) 

732 mapper = insp.mapper 

733 

734 nest_adapters = False 

735 

736 if alias is None: 

737 if insp.is_aliased_class and insp.selectable._is_subquery: 

738 alias = insp.selectable.alias() 

739 else: 

740 alias = ( 

741 mapper._with_polymorphic_selectable._anonymous_fromclause( 

742 name=name, 

743 flat=flat, 

744 ) 

745 ) 

746 elif insp.is_aliased_class: 

747 nest_adapters = True 

748 

749 assert alias is not None 

750 self._aliased_insp = AliasedInsp( 

751 self, 

752 insp, 

753 alias, 

754 name, 

755 ( 

756 with_polymorphic_mappers 

757 if with_polymorphic_mappers 

758 else mapper.with_polymorphic_mappers 

759 ), 

760 ( 

761 with_polymorphic_discriminator 

762 if with_polymorphic_discriminator is not None 

763 else mapper.polymorphic_on 

764 ), 

765 base_alias, 

766 use_mapper_path, 

767 adapt_on_names, 

768 represents_outer_join, 

769 nest_adapters, 

770 ) 

771 

772 self.__name__ = f"aliased({mapper.class_.__name__})" 

773 

774 @classmethod 

775 def _reconstitute_from_aliased_insp( 

776 cls, aliased_insp: AliasedInsp[_O] 

777 ) -> AliasedClass[_O]: 

778 obj = cls.__new__(cls) 

779 obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" 

780 obj._aliased_insp = aliased_insp 

781 

782 if aliased_insp._is_with_polymorphic: 

783 for sub_aliased_insp in aliased_insp._with_polymorphic_entities: 

784 if sub_aliased_insp is not aliased_insp: 

785 ent = AliasedClass._reconstitute_from_aliased_insp( 

786 sub_aliased_insp 

787 ) 

788 setattr(obj, sub_aliased_insp.class_.__name__, ent) 

789 

790 return obj 

791 

792 def __getattr__(self, key: str) -> Any: 

793 try: 

794 _aliased_insp = self.__dict__["_aliased_insp"] 

795 except KeyError: 

796 raise AttributeError() 

797 else: 

798 target = _aliased_insp._target 

799 # maintain all getattr mechanics 

800 attr = getattr(target, key) 

801 

802 # attribute is a method, that will be invoked against a 

803 # "self"; so just return a new method with the same function and 

804 # new self 

805 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

806 return types.MethodType(attr.__func__, self) 

807 

808 # attribute is a descriptor, that will be invoked against a 

809 # "self"; so invoke the descriptor against this self 

810 if hasattr(attr, "__get__"): 

811 attr = attr.__get__(None, self) 

812 

813 # attributes within the QueryableAttribute system will want this 

814 # to be invoked so the object can be adapted 

815 if hasattr(attr, "adapt_to_entity"): 

816 attr = attr.adapt_to_entity(_aliased_insp) 

817 setattr(self, key, attr) 

818 

819 return attr 

820 

821 def _get_from_serialized( 

822 self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] 

823 ) -> Any: 

824 # this method is only used in terms of the 

825 # sqlalchemy.ext.serializer extension 

826 attr = getattr(mapped_class, key) 

827 if hasattr(attr, "__call__") and hasattr(attr, "__self__"): 

828 return types.MethodType(attr.__func__, self) 

829 

830 # attribute is a descriptor, that will be invoked against a 

831 # "self"; so invoke the descriptor against this self 

832 if hasattr(attr, "__get__"): 

833 attr = attr.__get__(None, self) 

834 

835 # attributes within the QueryableAttribute system will want this 

836 # to be invoked so the object can be adapted 

837 if hasattr(attr, "adapt_to_entity"): 

838 aliased_insp._weak_entity = weakref.ref(self) 

839 attr = attr.adapt_to_entity(aliased_insp) 

840 setattr(self, key, attr) 

841 

842 return attr 

843 

844 def __repr__(self) -> str: 

845 return "<AliasedClass at 0x%x; %s>" % ( 

846 id(self), 

847 self._aliased_insp._target.__name__, 

848 ) 

849 

850 def __str__(self) -> str: 

851 return str(self._aliased_insp) 

852 

853 

854@inspection._self_inspects 

855class AliasedInsp( 

856 ORMEntityColumnsClauseRole[_O], 

857 ORMFromClauseRole, 

858 HasCacheKey, 

859 InspectionAttr, 

860 MemoizedSlots, 

861 inspection.Inspectable["AliasedInsp[_O]"], 

862 Generic[_O], 

863): 

864 """Provide an inspection interface for an 

865 :class:`.AliasedClass` object. 

866 

867 The :class:`.AliasedInsp` object is returned 

868 given an :class:`.AliasedClass` using the 

869 :func:`_sa.inspect` function:: 

870 

871 from sqlalchemy import inspect 

872 from sqlalchemy.orm import aliased 

873 

874 my_alias = aliased(MyMappedClass) 

875 insp = inspect(my_alias) 

876 

877 Attributes on :class:`.AliasedInsp` 

878 include: 

879 

880 * ``entity`` - the :class:`.AliasedClass` represented. 

881 * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. 

882 * ``selectable`` - the :class:`_expression.Alias` 

883 construct which ultimately 

884 represents an aliased :class:`_schema.Table` or 

885 :class:`_expression.Select` 

886 construct. 

887 * ``name`` - the name of the alias. Also is used as the attribute 

888 name when returned in a result tuple from :class:`_query.Query`. 

889 * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` 

890 objects 

891 indicating all those mappers expressed in the select construct 

892 for the :class:`.AliasedClass`. 

893 * ``polymorphic_on`` - an alternate column or SQL expression which 

894 will be used as the "discriminator" for a polymorphic load. 

895 

896 .. seealso:: 

897 

898 :ref:`inspection_toplevel` 

899 

900 """ 

901 

902 __slots__ = ( 

903 "__weakref__", 

904 "_weak_entity", 

905 "mapper", 

906 "selectable", 

907 "name", 

908 "_adapt_on_names", 

909 "with_polymorphic_mappers", 

910 "polymorphic_on", 

911 "_use_mapper_path", 

912 "_base_alias", 

913 "represents_outer_join", 

914 "persist_selectable", 

915 "local_table", 

916 "_is_with_polymorphic", 

917 "_with_polymorphic_entities", 

918 "_adapter", 

919 "_target", 

920 "__clause_element__", 

921 "_memoized_values", 

922 "_all_column_expressions", 

923 "_nest_adapters", 

924 ) 

925 

926 _cache_key_traversal = [ 

927 ("name", visitors.ExtendedInternalTraversal.dp_string), 

928 ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), 

929 ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), 

930 ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), 

931 ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), 

932 ( 

933 "with_polymorphic_mappers", 

934 visitors.InternalTraversal.dp_has_cache_key_list, 

935 ), 

936 ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), 

937 ] 

938 

939 mapper: Mapper[_O] 

940 selectable: FromClause 

941 _adapter: ORMAdapter 

942 with_polymorphic_mappers: Sequence[Mapper[Any]] 

943 _with_polymorphic_entities: Sequence[AliasedInsp[Any]] 

944 

945 _weak_entity: weakref.ref[AliasedClass[_O]] 

946 """the AliasedClass that refers to this AliasedInsp""" 

947 

948 _target: Union[Type[_O], AliasedClass[_O]] 

949 """the thing referenced by the AliasedClass/AliasedInsp. 

950 

951 In the vast majority of cases, this is the mapped class. However 

952 it may also be another AliasedClass (alias of alias). 

953 

954 """ 

955 

956 def __init__( 

957 self, 

958 entity: AliasedClass[_O], 

959 inspected: _InternalEntityType[_O], 

960 selectable: FromClause, 

961 name: Optional[str], 

962 with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], 

963 polymorphic_on: Optional[ColumnElement[Any]], 

964 _base_alias: Optional[AliasedInsp[Any]], 

965 _use_mapper_path: bool, 

966 adapt_on_names: bool, 

967 represents_outer_join: bool, 

968 nest_adapters: bool, 

969 ): 

970 mapped_class_or_ac = inspected.entity 

971 mapper = inspected.mapper 

972 

973 self._weak_entity = weakref.ref(entity) 

974 self.mapper = mapper 

975 self.selectable = self.persist_selectable = self.local_table = ( 

976 selectable 

977 ) 

978 self.name = name 

979 self.polymorphic_on = polymorphic_on 

980 self._base_alias = weakref.ref(_base_alias or self) 

981 self._use_mapper_path = _use_mapper_path 

982 self.represents_outer_join = represents_outer_join 

983 self._nest_adapters = nest_adapters 

984 

985 if with_polymorphic_mappers: 

986 self._is_with_polymorphic = True 

987 self.with_polymorphic_mappers = with_polymorphic_mappers 

988 self._with_polymorphic_entities = [] 

989 for poly in self.with_polymorphic_mappers: 

990 if poly is not mapper: 

991 ent = AliasedClass( 

992 poly.class_, 

993 selectable, 

994 base_alias=self, 

995 adapt_on_names=adapt_on_names, 

996 use_mapper_path=_use_mapper_path, 

997 ) 

998 

999 setattr(self.entity, poly.class_.__name__, ent) 

1000 self._with_polymorphic_entities.append(ent._aliased_insp) 

1001 

1002 else: 

1003 self._is_with_polymorphic = False 

1004 self.with_polymorphic_mappers = [mapper] 

1005 

1006 self._adapter = ORMAdapter( 

1007 _TraceAdaptRole.ALIASED_INSP, 

1008 mapper, 

1009 selectable=selectable, 

1010 equivalents=mapper._equivalent_columns, 

1011 adapt_on_names=adapt_on_names, 

1012 anonymize_labels=True, 

1013 # make sure the adapter doesn't try to grab other tables that 

1014 # are not even the thing we are mapping, such as embedded 

1015 # selectables in subqueries or CTEs. See issue #6060 

1016 adapt_from_selectables={ 

1017 m.selectable 

1018 for m in self.with_polymorphic_mappers 

1019 if not adapt_on_names 

1020 }, 

1021 limit_on_entity=False, 

1022 ) 

1023 

1024 if nest_adapters: 

1025 # supports "aliased class of aliased class" use case 

1026 assert isinstance(inspected, AliasedInsp) 

1027 self._adapter = inspected._adapter.wrap(self._adapter) 

1028 

1029 self._adapt_on_names = adapt_on_names 

1030 self._target = mapped_class_or_ac 

1031 

1032 @classmethod 

1033 def _alias_factory( 

1034 cls, 

1035 element: Union[_EntityType[_O], FromClause], 

1036 alias: Optional[FromClause] = None, 

1037 name: Optional[str] = None, 

1038 flat: bool = False, 

1039 adapt_on_names: bool = False, 

1040 ) -> Union[AliasedClass[_O], FromClause]: 

1041 if isinstance(element, FromClause): 

1042 if adapt_on_names: 

1043 raise sa_exc.ArgumentError( 

1044 "adapt_on_names only applies to ORM elements" 

1045 ) 

1046 if name: 

1047 return element.alias(name=name, flat=flat) 

1048 else: 

1049 return coercions.expect( 

1050 roles.AnonymizedFromClauseRole, element, flat=flat 

1051 ) 

1052 else: 

1053 return AliasedClass( 

1054 element, 

1055 alias=alias, 

1056 flat=flat, 

1057 name=name, 

1058 adapt_on_names=adapt_on_names, 

1059 ) 

1060 

1061 @classmethod 

1062 def _with_polymorphic_factory( 

1063 cls, 

1064 base: Union[Type[_O], Mapper[_O]], 

1065 classes: Union[Literal["*"], Iterable[_EntityType[Any]]], 

1066 selectable: Union[Literal[False, None], FromClause] = False, 

1067 flat: bool = False, 

1068 polymorphic_on: Optional[ColumnElement[Any]] = None, 

1069 aliased: bool = False, 

1070 innerjoin: bool = False, 

1071 adapt_on_names: bool = False, 

1072 name: Optional[str] = None, 

1073 _use_mapper_path: bool = False, 

1074 ) -> AliasedClass[_O]: 

1075 primary_mapper = _class_to_mapper(base) 

1076 

1077 if selectable not in (None, False) and flat: 

1078 raise sa_exc.ArgumentError( 

1079 "the 'flat' and 'selectable' arguments cannot be passed " 

1080 "simultaneously to with_polymorphic()" 

1081 ) 

1082 

1083 mappers, selectable = primary_mapper._with_polymorphic_args( 

1084 classes, selectable, innerjoin=innerjoin 

1085 ) 

1086 if aliased or flat: 

1087 assert selectable is not None 

1088 selectable = selectable._anonymous_fromclause(flat=flat) 

1089 

1090 return AliasedClass( 

1091 base, 

1092 selectable, 

1093 name=name, 

1094 with_polymorphic_mappers=mappers, 

1095 adapt_on_names=adapt_on_names, 

1096 with_polymorphic_discriminator=polymorphic_on, 

1097 use_mapper_path=_use_mapper_path, 

1098 represents_outer_join=not innerjoin, 

1099 ) 

1100 

1101 @property 

1102 def entity(self) -> AliasedClass[_O]: 

1103 # to eliminate reference cycles, the AliasedClass is held weakly. 

1104 # this produces some situations where the AliasedClass gets lost, 

1105 # particularly when one is created internally and only the AliasedInsp 

1106 # is passed around. 

1107 # to work around this case, we just generate a new one when we need 

1108 # it, as it is a simple class with very little initial state on it. 

1109 ent = self._weak_entity() 

1110 if ent is None: 

1111 ent = AliasedClass._reconstitute_from_aliased_insp(self) 

1112 self._weak_entity = weakref.ref(ent) 

1113 return ent 

1114 

1115 is_aliased_class = True 

1116 "always returns True" 

1117 

1118 def _memoized_method___clause_element__(self) -> FromClause: 

1119 return self.selectable._annotate( 

1120 { 

1121 "parentmapper": self.mapper, 

1122 "parententity": self, 

1123 "entity_namespace": self, 

1124 } 

1125 )._set_propagate_attrs( 

1126 {"compile_state_plugin": "orm", "plugin_subject": self} 

1127 ) 

1128 

1129 @property 

1130 def entity_namespace(self) -> AliasedClass[_O]: 

1131 return self.entity 

1132 

1133 @property 

1134 def class_(self) -> Type[_O]: 

1135 """Return the mapped class ultimately represented by this 

1136 :class:`.AliasedInsp`.""" 

1137 return self.mapper.class_ 

1138 

1139 @property 

1140 def _path_registry(self) -> AbstractEntityRegistry: 

1141 if self._use_mapper_path: 

1142 return self.mapper._path_registry 

1143 else: 

1144 return PathRegistry.per_mapper(self) 

1145 

1146 def __getstate__(self) -> Dict[str, Any]: 

1147 return { 

1148 "entity": self.entity, 

1149 "mapper": self.mapper, 

1150 "alias": self.selectable, 

1151 "name": self.name, 

1152 "adapt_on_names": self._adapt_on_names, 

1153 "with_polymorphic_mappers": self.with_polymorphic_mappers, 

1154 "with_polymorphic_discriminator": self.polymorphic_on, 

1155 "base_alias": self._base_alias(), 

1156 "use_mapper_path": self._use_mapper_path, 

1157 "represents_outer_join": self.represents_outer_join, 

1158 "nest_adapters": self._nest_adapters, 

1159 } 

1160 

1161 def __setstate__(self, state: Dict[str, Any]) -> None: 

1162 self.__init__( # type: ignore 

1163 state["entity"], 

1164 state["mapper"], 

1165 state["alias"], 

1166 state["name"], 

1167 state["with_polymorphic_mappers"], 

1168 state["with_polymorphic_discriminator"], 

1169 state["base_alias"], 

1170 state["use_mapper_path"], 

1171 state["adapt_on_names"], 

1172 state["represents_outer_join"], 

1173 state["nest_adapters"], 

1174 ) 

1175 

1176 def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: 

1177 # assert self._is_with_polymorphic 

1178 # assert other._is_with_polymorphic 

1179 

1180 primary_mapper = other.mapper 

1181 

1182 assert self.mapper is primary_mapper 

1183 

1184 our_classes = util.to_set( 

1185 mp.class_ for mp in self.with_polymorphic_mappers 

1186 ) 

1187 new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} 

1188 if our_classes == new_classes: 

1189 return other 

1190 else: 

1191 classes = our_classes.union(new_classes) 

1192 

1193 mappers, selectable = primary_mapper._with_polymorphic_args( 

1194 classes, None, innerjoin=not other.represents_outer_join 

1195 ) 

1196 selectable = selectable._anonymous_fromclause(flat=True) 

1197 return AliasedClass( 

1198 primary_mapper, 

1199 selectable, 

1200 with_polymorphic_mappers=mappers, 

1201 with_polymorphic_discriminator=other.polymorphic_on, 

1202 use_mapper_path=other._use_mapper_path, 

1203 represents_outer_join=other.represents_outer_join, 

1204 )._aliased_insp 

1205 

1206 def _adapt_element( 

1207 self, expr: _ORMCOLEXPR, key: Optional[str] = None 

1208 ) -> _ORMCOLEXPR: 

1209 assert isinstance(expr, ColumnElement) 

1210 d: Dict[str, Any] = { 

1211 "parententity": self, 

1212 "parentmapper": self.mapper, 

1213 } 

1214 if key: 

1215 d["proxy_key"] = key 

1216 

1217 # IMO mypy should see this one also as returning the same type 

1218 # we put into it, but it's not 

1219 return ( 

1220 self._adapter.traverse(expr) 

1221 ._annotate(d) 

1222 ._set_propagate_attrs( 

1223 {"compile_state_plugin": "orm", "plugin_subject": self} 

1224 ) 

1225 ) 

1226 

1227 if TYPE_CHECKING: 

1228 # establish compatibility with the _ORMAdapterProto protocol, 

1229 # which in turn is compatible with _CoreAdapterProto. 

1230 

1231 def _orm_adapt_element( 

1232 self, 

1233 obj: _CE, 

1234 key: Optional[str] = None, 

1235 ) -> _CE: ... 

1236 

1237 else: 

1238 _orm_adapt_element = _adapt_element 

1239 

1240 def _entity_for_mapper(self, mapper): 

1241 self_poly = self.with_polymorphic_mappers 

1242 if mapper in self_poly: 

1243 if mapper is self.mapper: 

1244 return self 

1245 else: 

1246 return getattr( 

1247 self.entity, mapper.class_.__name__ 

1248 )._aliased_insp 

1249 elif mapper.isa(self.mapper): 

1250 return self 

1251 else: 

1252 assert False, "mapper %s doesn't correspond to %s" % (mapper, self) 

1253 

1254 def _memoized_attr__get_clause(self): 

1255 onclause, replacemap = self.mapper._get_clause 

1256 return ( 

1257 self._adapter.traverse(onclause), 

1258 { 

1259 self._adapter.traverse(col): param 

1260 for col, param in replacemap.items() 

1261 }, 

1262 ) 

1263 

1264 def _memoized_attr__memoized_values(self): 

1265 return {} 

1266 

1267 def _memoized_attr__all_column_expressions(self): 

1268 if self._is_with_polymorphic: 

1269 cols_plus_keys = self.mapper._columns_plus_keys( 

1270 [ent.mapper for ent in self._with_polymorphic_entities] 

1271 ) 

1272 else: 

1273 cols_plus_keys = self.mapper._columns_plus_keys() 

1274 

1275 cols_plus_keys = [ 

1276 (key, self._adapt_element(col)) for key, col in cols_plus_keys 

1277 ] 

1278 

1279 return ColumnCollection(cols_plus_keys) 

1280 

1281 def _memo(self, key, callable_, *args, **kw): 

1282 if key in self._memoized_values: 

1283 return self._memoized_values[key] 

1284 else: 

1285 self._memoized_values[key] = value = callable_(*args, **kw) 

1286 return value 

1287 

1288 def __repr__(self): 

1289 if self.with_polymorphic_mappers: 

1290 with_poly = "(%s)" % ", ".join( 

1291 mp.class_.__name__ for mp in self.with_polymorphic_mappers 

1292 ) 

1293 else: 

1294 with_poly = "" 

1295 return "<AliasedInsp at 0x%x; %s%s>" % ( 

1296 id(self), 

1297 self.class_.__name__, 

1298 with_poly, 

1299 ) 

1300 

1301 def __str__(self): 

1302 if self._is_with_polymorphic: 

1303 return "with_polymorphic(%s, [%s])" % ( 

1304 self._target.__name__, 

1305 ", ".join( 

1306 mp.class_.__name__ 

1307 for mp in self.with_polymorphic_mappers 

1308 if mp is not self.mapper 

1309 ), 

1310 ) 

1311 else: 

1312 return "aliased(%s)" % (self._target.__name__,) 

1313 

1314 

1315class _WrapUserEntity: 

1316 """A wrapper used within the loader_criteria lambda caller so that 

1317 we can bypass declared_attr descriptors on unmapped mixins, which 

1318 normally emit a warning for such use. 

1319 

1320 might also be useful for other per-lambda instrumentations should 

1321 the need arise. 

1322 

1323 """ 

1324 

1325 __slots__ = ("subject",) 

1326 

1327 def __init__(self, subject): 

1328 self.subject = subject 

1329 

1330 @util.preload_module("sqlalchemy.orm.decl_api") 

1331 def __getattribute__(self, name): 

1332 decl_api = util.preloaded.orm.decl_api 

1333 

1334 subject = object.__getattribute__(self, "subject") 

1335 if name in subject.__dict__ and isinstance( 

1336 subject.__dict__[name], decl_api.declared_attr 

1337 ): 

1338 return subject.__dict__[name].fget(subject) 

1339 else: 

1340 return getattr(subject, name) 

1341 

1342 

1343class LoaderCriteriaOption(CriteriaOption): 

1344 """Add additional WHERE criteria to the load for all occurrences of 

1345 a particular entity. 

1346 

1347 :class:`_orm.LoaderCriteriaOption` is invoked using the 

1348 :func:`_orm.with_loader_criteria` function; see that function for 

1349 details. 

1350 

1351 .. versionadded:: 1.4 

1352 

1353 """ 

1354 

1355 __slots__ = ( 

1356 "root_entity", 

1357 "entity", 

1358 "deferred_where_criteria", 

1359 "where_criteria", 

1360 "_where_crit_orig", 

1361 "include_aliases", 

1362 "propagate_to_loaders", 

1363 ) 

1364 

1365 _traverse_internals = [ 

1366 ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), 

1367 ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), 

1368 ("where_criteria", visitors.InternalTraversal.dp_clauseelement), 

1369 ("include_aliases", visitors.InternalTraversal.dp_boolean), 

1370 ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), 

1371 ] 

1372 

1373 root_entity: Optional[Type[Any]] 

1374 entity: Optional[_InternalEntityType[Any]] 

1375 where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] 

1376 deferred_where_criteria: bool 

1377 include_aliases: bool 

1378 propagate_to_loaders: bool 

1379 

1380 _where_crit_orig: Any 

1381 

1382 def __init__( 

1383 self, 

1384 entity_or_base: _EntityType[Any], 

1385 where_criteria: Union[ 

1386 _ColumnExpressionArgument[bool], 

1387 Callable[[Any], _ColumnExpressionArgument[bool]], 

1388 ], 

1389 loader_only: bool = False, 

1390 include_aliases: bool = False, 

1391 propagate_to_loaders: bool = True, 

1392 track_closure_variables: bool = True, 

1393 ): 

1394 entity = cast( 

1395 "_InternalEntityType[Any]", 

1396 inspection.inspect(entity_or_base, False), 

1397 ) 

1398 if entity is None: 

1399 self.root_entity = cast("Type[Any]", entity_or_base) 

1400 self.entity = None 

1401 else: 

1402 self.root_entity = None 

1403 self.entity = entity 

1404 

1405 self._where_crit_orig = where_criteria 

1406 if callable(where_criteria): 

1407 if self.root_entity is not None: 

1408 wrap_entity = self.root_entity 

1409 else: 

1410 assert entity is not None 

1411 wrap_entity = entity.entity 

1412 

1413 self.deferred_where_criteria = True 

1414 self.where_criteria = lambdas.DeferredLambdaElement( 

1415 where_criteria, 

1416 roles.WhereHavingRole, 

1417 lambda_args=(_WrapUserEntity(wrap_entity),), 

1418 opts=lambdas.LambdaOptions( 

1419 track_closure_variables=track_closure_variables 

1420 ), 

1421 ) 

1422 else: 

1423 self.deferred_where_criteria = False 

1424 self.where_criteria = coercions.expect( 

1425 roles.WhereHavingRole, where_criteria 

1426 ) 

1427 

1428 self.include_aliases = include_aliases 

1429 self.propagate_to_loaders = propagate_to_loaders 

1430 

1431 @classmethod 

1432 def _unreduce( 

1433 cls, entity, where_criteria, include_aliases, propagate_to_loaders 

1434 ): 

1435 return LoaderCriteriaOption( 

1436 entity, 

1437 where_criteria, 

1438 include_aliases=include_aliases, 

1439 propagate_to_loaders=propagate_to_loaders, 

1440 ) 

1441 

1442 def __reduce__(self): 

1443 return ( 

1444 LoaderCriteriaOption._unreduce, 

1445 ( 

1446 self.entity.class_ if self.entity else self.root_entity, 

1447 self._where_crit_orig, 

1448 self.include_aliases, 

1449 self.propagate_to_loaders, 

1450 ), 

1451 ) 

1452 

1453 def _all_mappers(self) -> Iterator[Mapper[Any]]: 

1454 if self.entity: 

1455 yield from self.entity.mapper.self_and_descendants 

1456 else: 

1457 assert self.root_entity 

1458 stack = list(self.root_entity.__subclasses__()) 

1459 while stack: 

1460 subclass = stack.pop(0) 

1461 ent = cast( 

1462 "_InternalEntityType[Any]", 

1463 inspection.inspect(subclass, raiseerr=False), 

1464 ) 

1465 if ent: 

1466 yield from ent.mapper.self_and_descendants 

1467 else: 

1468 stack.extend(subclass.__subclasses__()) 

1469 

1470 def _should_include(self, compile_state: ORMCompileState) -> bool: 

1471 if ( 

1472 compile_state.select_statement._annotations.get( 

1473 "for_loader_criteria", None 

1474 ) 

1475 is self 

1476 ): 

1477 return False 

1478 return True 

1479 

1480 def _resolve_where_criteria( 

1481 self, ext_info: _InternalEntityType[Any] 

1482 ) -> ColumnElement[bool]: 

1483 if self.deferred_where_criteria: 

1484 crit = cast( 

1485 "ColumnElement[bool]", 

1486 self.where_criteria._resolve_with_args(ext_info.entity), 

1487 ) 

1488 else: 

1489 crit = self.where_criteria # type: ignore 

1490 assert isinstance(crit, ColumnElement) 

1491 return sql_util._deep_annotate( 

1492 crit, 

1493 {"for_loader_criteria": self}, 

1494 detect_subquery_cols=True, 

1495 ind_cols_on_fromclause=True, 

1496 ) 

1497 

1498 def process_compile_state_replaced_entities( 

1499 self, 

1500 compile_state: ORMCompileState, 

1501 mapper_entities: Iterable[_MapperEntity], 

1502 ) -> None: 

1503 self.process_compile_state(compile_state) 

1504 

1505 def process_compile_state(self, compile_state: ORMCompileState) -> None: 

1506 """Apply a modification to a given :class:`.CompileState`.""" 

1507 

1508 # if options to limit the criteria to immediate query only, 

1509 # use compile_state.attributes instead 

1510 

1511 self.get_global_criteria(compile_state.global_attributes) 

1512 

1513 def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: 

1514 for mp in self._all_mappers(): 

1515 load_criteria = attributes.setdefault( 

1516 ("additional_entity_criteria", mp), [] 

1517 ) 

1518 

1519 load_criteria.append(self) 

1520 

1521 

1522inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) 

1523 

1524 

1525@inspection._inspects(type) 

1526def _inspect_mc( 

1527 class_: Type[_O], 

1528) -> Optional[Mapper[_O]]: 

1529 try: 

1530 class_manager = opt_manager_of_class(class_) 

1531 if class_manager is None or not class_manager.is_mapped: 

1532 return None 

1533 mapper = class_manager.mapper 

1534 except exc.NO_STATE: 

1535 return None 

1536 else: 

1537 return mapper 

1538 

1539 

1540GenericAlias = type(List[Any]) 

1541 

1542 

1543@inspection._inspects(GenericAlias) 

1544def _inspect_generic_alias( 

1545 class_: Type[_O], 

1546) -> Optional[Mapper[_O]]: 

1547 origin = cast("Type[_O]", typing_get_origin(class_)) 

1548 return _inspect_mc(origin) 

1549 

1550 

1551@inspection._self_inspects 

1552class Bundle( 

1553 ORMColumnsClauseRole[_T], 

1554 SupportsCloneAnnotations, 

1555 MemoizedHasCacheKey, 

1556 inspection.Inspectable["Bundle[_T]"], 

1557 InspectionAttr, 

1558): 

1559 """A grouping of SQL expressions that are returned by a :class:`.Query` 

1560 under one namespace. 

1561 

1562 The :class:`.Bundle` essentially allows nesting of the tuple-based 

1563 results returned by a column-oriented :class:`_query.Query` object. 

1564 It also 

1565 is extensible via simple subclassing, where the primary capability 

1566 to override is that of how the set of expressions should be returned, 

1567 allowing post-processing as well as custom return types, without 

1568 involving ORM identity-mapped classes. 

1569 

1570 .. seealso:: 

1571 

1572 :ref:`bundles` 

1573 

1574 

1575 """ 

1576 

1577 single_entity = False 

1578 """If True, queries for a single Bundle will be returned as a single 

1579 entity, rather than an element within a keyed tuple.""" 

1580 

1581 is_clause_element = False 

1582 

1583 is_mapper = False 

1584 

1585 is_aliased_class = False 

1586 

1587 is_bundle = True 

1588 

1589 _propagate_attrs: _PropagateAttrsType = util.immutabledict() 

1590 

1591 proxy_set = util.EMPTY_SET # type: ignore 

1592 

1593 exprs: List[_ColumnsClauseElement] 

1594 

1595 def __init__( 

1596 self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any 

1597 ): 

1598 r"""Construct a new :class:`.Bundle`. 

1599 

1600 e.g.:: 

1601 

1602 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1603 

1604 for row in session.query(bn).filter( 

1605 bn.c.x == 5).filter(bn.c.y == 4): 

1606 print(row.mybundle.x, row.mybundle.y) 

1607 

1608 :param name: name of the bundle. 

1609 :param \*exprs: columns or SQL expressions comprising the bundle. 

1610 :param single_entity=False: if True, rows for this :class:`.Bundle` 

1611 can be returned as a "single entity" outside of any enclosing tuple 

1612 in the same manner as a mapped entity. 

1613 

1614 """ 

1615 self.name = self._label = name 

1616 coerced_exprs = [ 

1617 coercions.expect( 

1618 roles.ColumnsClauseRole, expr, apply_propagate_attrs=self 

1619 ) 

1620 for expr in exprs 

1621 ] 

1622 self.exprs = coerced_exprs 

1623 

1624 self.c = self.columns = ColumnCollection( 

1625 (getattr(col, "key", col._label), col) 

1626 for col in [e._annotations.get("bundle", e) for e in coerced_exprs] 

1627 ).as_readonly() 

1628 self.single_entity = kw.pop("single_entity", self.single_entity) 

1629 

1630 def _gen_cache_key( 

1631 self, anon_map: anon_map, bindparams: List[BindParameter[Any]] 

1632 ) -> Tuple[Any, ...]: 

1633 return (self.__class__, self.name, self.single_entity) + tuple( 

1634 [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] 

1635 ) 

1636 

1637 @property 

1638 def mapper(self) -> Optional[Mapper[Any]]: 

1639 mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( 

1640 "parentmapper", None 

1641 ) 

1642 return mp 

1643 

1644 @property 

1645 def entity(self) -> Optional[_InternalEntityType[Any]]: 

1646 ie: Optional[_InternalEntityType[Any]] = self.exprs[ 

1647 0 

1648 ]._annotations.get("parententity", None) 

1649 return ie 

1650 

1651 @property 

1652 def entity_namespace( 

1653 self, 

1654 ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: 

1655 return self.c 

1656 

1657 columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1658 

1659 """A namespace of SQL expressions referred to by this :class:`.Bundle`. 

1660 

1661 e.g.:: 

1662 

1663 bn = Bundle("mybundle", MyClass.x, MyClass.y) 

1664 

1665 q = sess.query(bn).filter(bn.c.x == 5) 

1666 

1667 Nesting of bundles is also supported:: 

1668 

1669 b1 = Bundle("b1", 

1670 Bundle('b2', MyClass.a, MyClass.b), 

1671 Bundle('b3', MyClass.x, MyClass.y) 

1672 ) 

1673 

1674 q = sess.query(b1).filter( 

1675 b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) 

1676 

1677 .. seealso:: 

1678 

1679 :attr:`.Bundle.c` 

1680 

1681 """ 

1682 

1683 c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] 

1684 """An alias for :attr:`.Bundle.columns`.""" 

1685 

1686 def _clone(self): 

1687 cloned = self.__class__.__new__(self.__class__) 

1688 cloned.__dict__.update(self.__dict__) 

1689 return cloned 

1690 

1691 def __clause_element__(self): 

1692 # ensure existing entity_namespace remains 

1693 annotations = {"bundle": self, "entity_namespace": self} 

1694 annotations.update(self._annotations) 

1695 

1696 plugin_subject = self.exprs[0]._propagate_attrs.get( 

1697 "plugin_subject", self.entity 

1698 ) 

1699 return ( 

1700 expression.ClauseList( 

1701 _literal_as_text_role=roles.ColumnsClauseRole, 

1702 group=False, 

1703 *[e._annotations.get("bundle", e) for e in self.exprs], 

1704 ) 

1705 ._annotate(annotations) 

1706 ._set_propagate_attrs( 

1707 # the Bundle *must* use the orm plugin no matter what. the 

1708 # subject can be None but it's much better if it's not. 

1709 { 

1710 "compile_state_plugin": "orm", 

1711 "plugin_subject": plugin_subject, 

1712 } 

1713 ) 

1714 ) 

1715 

1716 @property 

1717 def clauses(self): 

1718 return self.__clause_element__().clauses 

1719 

1720 def label(self, name): 

1721 """Provide a copy of this :class:`.Bundle` passing a new label.""" 

1722 

1723 cloned = self._clone() 

1724 cloned.name = name 

1725 return cloned 

1726 

1727 def create_row_processor( 

1728 self, 

1729 query: Select[Unpack[TupleAny]], 

1730 procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]], 

1731 labels: Sequence[str], 

1732 ) -> Callable[[Row[Unpack[TupleAny]]], Any]: 

1733 """Produce the "row processing" function for this :class:`.Bundle`. 

1734 

1735 May be overridden by subclasses to provide custom behaviors when 

1736 results are fetched. The method is passed the statement object and a 

1737 set of "row processor" functions at query execution time; these 

1738 processor functions when given a result row will return the individual 

1739 attribute value, which can then be adapted into any kind of return data 

1740 structure. 

1741 

1742 The example below illustrates replacing the usual :class:`.Row` 

1743 return structure with a straight Python dictionary:: 

1744 

1745 from sqlalchemy.orm import Bundle 

1746 

1747 class DictBundle(Bundle): 

1748 def create_row_processor(self, query, procs, labels): 

1749 'Override create_row_processor to return values as 

1750 dictionaries' 

1751 

1752 def proc(row): 

1753 return dict( 

1754 zip(labels, (proc(row) for proc in procs)) 

1755 ) 

1756 return proc 

1757 

1758 A result from the above :class:`_orm.Bundle` will return dictionary 

1759 values:: 

1760 

1761 bn = DictBundle('mybundle', MyClass.data1, MyClass.data2) 

1762 for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'): 

1763 print(row.mybundle['data1'], row.mybundle['data2']) 

1764 

1765 """ 

1766 keyed_tuple = result_tuple(labels, [() for l in labels]) 

1767 

1768 def proc(row: Row[Unpack[TupleAny]]) -> Any: 

1769 return keyed_tuple([proc(row) for proc in procs]) 

1770 

1771 return proc 

1772 

1773 

1774def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: 

1775 """Deep copy the given ClauseElement, annotating each element with the 

1776 "_orm_adapt" flag. 

1777 

1778 Elements within the exclude collection will be cloned but not annotated. 

1779 

1780 """ 

1781 return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) 

1782 

1783 

1784def _orm_deannotate(element: _SA) -> _SA: 

1785 """Remove annotations that link a column to a particular mapping. 

1786 

1787 Note this doesn't affect "remote" and "foreign" annotations 

1788 passed by the :func:`_orm.foreign` and :func:`_orm.remote` 

1789 annotators. 

1790 

1791 """ 

1792 

1793 return sql_util._deep_deannotate( 

1794 element, values=("_orm_adapt", "parententity") 

1795 ) 

1796 

1797 

1798def _orm_full_deannotate(element: _SA) -> _SA: 

1799 return sql_util._deep_deannotate(element) 

1800 

1801 

1802class _ORMJoin(expression.Join): 

1803 """Extend Join to support ORM constructs as input.""" 

1804 

1805 __visit_name__ = expression.Join.__visit_name__ 

1806 

1807 inherit_cache = True 

1808 

1809 def __init__( 

1810 self, 

1811 left: _FromClauseArgument, 

1812 right: _FromClauseArgument, 

1813 onclause: Optional[_OnClauseArgument] = None, 

1814 isouter: bool = False, 

1815 full: bool = False, 

1816 _left_memo: Optional[Any] = None, 

1817 _right_memo: Optional[Any] = None, 

1818 _extra_criteria: Tuple[ColumnElement[bool], ...] = (), 

1819 ): 

1820 left_info = cast( 

1821 "Union[FromClause, _InternalEntityType[Any]]", 

1822 inspection.inspect(left), 

1823 ) 

1824 

1825 right_info = cast( 

1826 "Union[FromClause, _InternalEntityType[Any]]", 

1827 inspection.inspect(right), 

1828 ) 

1829 adapt_to = right_info.selectable 

1830 

1831 # used by joined eager loader 

1832 self._left_memo = _left_memo 

1833 self._right_memo = _right_memo 

1834 

1835 if isinstance(onclause, attributes.QueryableAttribute): 

1836 if TYPE_CHECKING: 

1837 assert isinstance( 

1838 onclause.comparator, RelationshipProperty.Comparator 

1839 ) 

1840 on_selectable = onclause.comparator._source_selectable() 

1841 prop = onclause.property 

1842 _extra_criteria += onclause._extra_criteria 

1843 elif isinstance(onclause, MapperProperty): 

1844 # used internally by joined eager loader...possibly not ideal 

1845 prop = onclause 

1846 on_selectable = prop.parent.selectable 

1847 else: 

1848 prop = None 

1849 on_selectable = None 

1850 

1851 left_selectable = left_info.selectable 

1852 if prop: 

1853 adapt_from: Optional[FromClause] 

1854 if sql_util.clause_is_present(on_selectable, left_selectable): 

1855 adapt_from = on_selectable 

1856 else: 

1857 assert isinstance(left_selectable, FromClause) 

1858 adapt_from = left_selectable 

1859 

1860 ( 

1861 pj, 

1862 sj, 

1863 source, 

1864 dest, 

1865 secondary, 

1866 target_adapter, 

1867 ) = prop._create_joins( 

1868 source_selectable=adapt_from, 

1869 dest_selectable=adapt_to, 

1870 source_polymorphic=True, 

1871 of_type_entity=right_info, 

1872 alias_secondary=True, 

1873 extra_criteria=_extra_criteria, 

1874 ) 

1875 

1876 if sj is not None: 

1877 if isouter: 

1878 # note this is an inner join from secondary->right 

1879 right = sql.join(secondary, right, sj) 

1880 onclause = pj 

1881 else: 

1882 left = sql.join(left, secondary, pj, isouter) 

1883 onclause = sj 

1884 else: 

1885 onclause = pj 

1886 

1887 self._target_adapter = target_adapter 

1888 

1889 # we don't use the normal coercions logic for _ORMJoin 

1890 # (probably should), so do some gymnastics to get the entity. 

1891 # logic here is for #8721, which was a major bug in 1.4 

1892 # for almost two years, not reported/fixed until 1.4.43 (!) 

1893 if is_selectable(left_info): 

1894 parententity = left_selectable._annotations.get( 

1895 "parententity", None 

1896 ) 

1897 elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): 

1898 parententity = left_info 

1899 else: 

1900 parententity = None 

1901 

1902 if parententity is not None: 

1903 self._annotations = self._annotations.union( 

1904 {"parententity": parententity} 

1905 ) 

1906 

1907 augment_onclause = bool(_extra_criteria) and not prop 

1908 expression.Join.__init__(self, left, right, onclause, isouter, full) 

1909 

1910 assert self.onclause is not None 

1911 

1912 if augment_onclause: 

1913 self.onclause &= sql.and_(*_extra_criteria) 

1914 

1915 if ( 

1916 not prop 

1917 and getattr(right_info, "mapper", None) 

1918 and right_info.mapper.single # type: ignore 

1919 ): 

1920 right_info = cast("_InternalEntityType[Any]", right_info) 

1921 # if single inheritance target and we are using a manual 

1922 # or implicit ON clause, augment it the same way we'd augment the 

1923 # WHERE. 

1924 single_crit = right_info.mapper._single_table_criterion 

1925 if single_crit is not None: 

1926 if insp_is_aliased_class(right_info): 

1927 single_crit = right_info._adapter.traverse(single_crit) 

1928 self.onclause = self.onclause & single_crit 

1929 

1930 def _splice_into_center(self, other): 

1931 """Splice a join into the center. 

1932 

1933 Given join(a, b) and join(b, c), return join(a, b).join(c) 

1934 

1935 """ 

1936 leftmost = other 

1937 while isinstance(leftmost, sql.Join): 

1938 leftmost = leftmost.left 

1939 

1940 assert self.right is leftmost 

1941 

1942 left = _ORMJoin( 

1943 self.left, 

1944 other.left, 

1945 self.onclause, 

1946 isouter=self.isouter, 

1947 _left_memo=self._left_memo, 

1948 _right_memo=None, 

1949 ) 

1950 

1951 return _ORMJoin( 

1952 left, 

1953 other.right, 

1954 other.onclause, 

1955 isouter=other.isouter, 

1956 _right_memo=other._right_memo, 

1957 ) 

1958 

1959 def join( 

1960 self, 

1961 right: _FromClauseArgument, 

1962 onclause: Optional[_OnClauseArgument] = None, 

1963 isouter: bool = False, 

1964 full: bool = False, 

1965 ) -> _ORMJoin: 

1966 return _ORMJoin(self, right, onclause, full=full, isouter=isouter) 

1967 

1968 def outerjoin( 

1969 self, 

1970 right: _FromClauseArgument, 

1971 onclause: Optional[_OnClauseArgument] = None, 

1972 full: bool = False, 

1973 ) -> _ORMJoin: 

1974 return _ORMJoin(self, right, onclause, isouter=True, full=full) 

1975 

1976 

1977def with_parent( 

1978 instance: object, 

1979 prop: attributes.QueryableAttribute[Any], 

1980 from_entity: Optional[_EntityType[Any]] = None, 

1981) -> ColumnElement[bool]: 

1982 """Create filtering criterion that relates this query's primary entity 

1983 to the given related instance, using established 

1984 :func:`_orm.relationship()` 

1985 configuration. 

1986 

1987 E.g.:: 

1988 

1989 stmt = select(Address).where(with_parent(some_user, User.addresses)) 

1990 

1991 

1992 The SQL rendered is the same as that rendered when a lazy loader 

1993 would fire off from the given parent on that attribute, meaning 

1994 that the appropriate state is taken from the parent object in 

1995 Python without the need to render joins to the parent table 

1996 in the rendered statement. 

1997 

1998 The given property may also make use of :meth:`_orm.PropComparator.of_type` 

1999 to indicate the left side of the criteria:: 

2000 

2001 

2002 a1 = aliased(Address) 

2003 a2 = aliased(Address) 

2004 stmt = select(a1, a2).where( 

2005 with_parent(u1, User.addresses.of_type(a2)) 

2006 ) 

2007 

2008 The above use is equivalent to using the 

2009 :func:`_orm.with_parent.from_entity` argument:: 

2010 

2011 a1 = aliased(Address) 

2012 a2 = aliased(Address) 

2013 stmt = select(a1, a2).where( 

2014 with_parent(u1, User.addresses, from_entity=a2) 

2015 ) 

2016 

2017 :param instance: 

2018 An instance which has some :func:`_orm.relationship`. 

2019 

2020 :param property: 

2021 Class-bound attribute, which indicates 

2022 what relationship from the instance should be used to reconcile the 

2023 parent/child relationship. 

2024 

2025 :param from_entity: 

2026 Entity in which to consider as the left side. This defaults to the 

2027 "zero" entity of the :class:`_query.Query` itself. 

2028 

2029 .. versionadded:: 1.2 

2030 

2031 """ 

2032 prop_t: RelationshipProperty[Any] 

2033 

2034 if isinstance(prop, str): 

2035 raise sa_exc.ArgumentError( 

2036 "with_parent() accepts class-bound mapped attributes, not strings" 

2037 ) 

2038 elif isinstance(prop, attributes.QueryableAttribute): 

2039 if prop._of_type: 

2040 from_entity = prop._of_type 

2041 mapper_property = prop.property 

2042 if mapper_property is None or not prop_is_relationship( 

2043 mapper_property 

2044 ): 

2045 raise sa_exc.ArgumentError( 

2046 f"Expected relationship property for with_parent(), " 

2047 f"got {mapper_property}" 

2048 ) 

2049 prop_t = mapper_property 

2050 else: 

2051 prop_t = prop 

2052 

2053 return prop_t._with_parent(instance, from_entity=from_entity) 

2054 

2055 

2056def has_identity(object_: object) -> bool: 

2057 """Return True if the given object has a database 

2058 identity. 

2059 

2060 This typically corresponds to the object being 

2061 in either the persistent or detached state. 

2062 

2063 .. seealso:: 

2064 

2065 :func:`.was_deleted` 

2066 

2067 """ 

2068 state = attributes.instance_state(object_) 

2069 return state.has_identity 

2070 

2071 

2072def was_deleted(object_: object) -> bool: 

2073 """Return True if the given object was deleted 

2074 within a session flush. 

2075 

2076 This is regardless of whether or not the object is 

2077 persistent or detached. 

2078 

2079 .. seealso:: 

2080 

2081 :attr:`.InstanceState.was_deleted` 

2082 

2083 """ 

2084 

2085 state = attributes.instance_state(object_) 

2086 return state.was_deleted 

2087 

2088 

2089def _entity_corresponds_to( 

2090 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2091) -> bool: 

2092 """determine if 'given' corresponds to 'entity', in terms 

2093 of an entity passed to Query that would match the same entity 

2094 being referred to elsewhere in the query. 

2095 

2096 """ 

2097 if insp_is_aliased_class(entity): 

2098 if insp_is_aliased_class(given): 

2099 if entity._base_alias() is given._base_alias(): 

2100 return True 

2101 return False 

2102 elif insp_is_aliased_class(given): 

2103 if given._use_mapper_path: 

2104 return entity in given.with_polymorphic_mappers 

2105 else: 

2106 return entity is given 

2107 

2108 assert insp_is_mapper(given) 

2109 return entity.common_parent(given) 

2110 

2111 

2112def _entity_corresponds_to_use_path_impl( 

2113 given: _InternalEntityType[Any], entity: _InternalEntityType[Any] 

2114) -> bool: 

2115 """determine if 'given' corresponds to 'entity', in terms 

2116 of a path of loader options where a mapped attribute is taken to 

2117 be a member of a parent entity. 

2118 

2119 e.g.:: 

2120 

2121 someoption(A).someoption(A.b) # -> fn(A, A) -> True 

2122 someoption(A).someoption(C.d) # -> fn(A, C) -> False 

2123 

2124 a1 = aliased(A) 

2125 someoption(a1).someoption(A.b) # -> fn(a1, A) -> False 

2126 someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True 

2127 

2128 wp = with_polymorphic(A, [A1, A2]) 

2129 someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False 

2130 someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True 

2131 

2132 

2133 """ 

2134 if insp_is_aliased_class(given): 

2135 return ( 

2136 insp_is_aliased_class(entity) 

2137 and not entity._use_mapper_path 

2138 and (given is entity or entity in given._with_polymorphic_entities) 

2139 ) 

2140 elif not insp_is_aliased_class(entity): 

2141 return given.isa(entity.mapper) 

2142 else: 

2143 return ( 

2144 entity._use_mapper_path 

2145 and given in entity.with_polymorphic_mappers 

2146 ) 

2147 

2148 

2149def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: 

2150 """determine if 'given' "is a" mapper, in terms of the given 

2151 would load rows of type 'mapper'. 

2152 

2153 """ 

2154 if given.is_aliased_class: 

2155 return mapper in given.with_polymorphic_mappers or given.mapper.isa( 

2156 mapper 

2157 ) 

2158 elif given.with_polymorphic_mappers: 

2159 return mapper in given.with_polymorphic_mappers or given.isa(mapper) 

2160 else: 

2161 return given.isa(mapper) 

2162 

2163 

2164def _getitem(iterable_query: Query[Any], item: Any) -> Any: 

2165 """calculate __getitem__ in terms of an iterable query object 

2166 that also has a slice() method. 

2167 

2168 """ 

2169 

2170 def _no_negative_indexes(): 

2171 raise IndexError( 

2172 "negative indexes are not accepted by SQL " 

2173 "index / slice operators" 

2174 ) 

2175 

2176 if isinstance(item, slice): 

2177 start, stop, step = util.decode_slice(item) 

2178 

2179 if ( 

2180 isinstance(stop, int) 

2181 and isinstance(start, int) 

2182 and stop - start <= 0 

2183 ): 

2184 return [] 

2185 

2186 elif (isinstance(start, int) and start < 0) or ( 

2187 isinstance(stop, int) and stop < 0 

2188 ): 

2189 _no_negative_indexes() 

2190 

2191 res = iterable_query.slice(start, stop) 

2192 if step is not None: 

2193 return list(res)[None : None : item.step] 

2194 else: 

2195 return list(res) 

2196 else: 

2197 if item == -1: 

2198 _no_negative_indexes() 

2199 else: 

2200 return list(iterable_query[item : item + 1])[0] 

2201 

2202 

2203def _is_mapped_annotation( 

2204 raw_annotation: _AnnotationScanType, 

2205 cls: Type[Any], 

2206 originating_cls: Type[Any], 

2207) -> bool: 

2208 try: 

2209 annotated = de_stringify_annotation( 

2210 cls, raw_annotation, originating_cls.__module__ 

2211 ) 

2212 except NameError: 

2213 # in most cases, at least within our own tests, we can raise 

2214 # here, which is more accurate as it prevents us from returning 

2215 # false negatives. However, in the real world, try to avoid getting 

2216 # involved with end-user annotations that have nothing to do with us. 

2217 # see issue #8888 where we bypass using this function in the case 

2218 # that we want to detect an unresolvable Mapped[] type. 

2219 return False 

2220 else: 

2221 return is_origin_of_cls(annotated, _MappedAnnotationBase) 

2222 

2223 

2224class _CleanupError(Exception): 

2225 pass 

2226 

2227 

2228def _cleanup_mapped_str_annotation( 

2229 annotation: str, originating_module: str 

2230) -> str: 

2231 # fix up an annotation that comes in as the form: 

2232 # 'Mapped[List[Address]]' so that it instead looks like: 

2233 # 'Mapped[List["Address"]]' , which will allow us to get 

2234 # "Address" as a string 

2235 

2236 # additionally, resolve symbols for these names since this is where 

2237 # we'd have to do it 

2238 

2239 inner: Optional[Match[str]] 

2240 

2241 mm = re.match(r"^(.+?)\[(.+)\]$", annotation) 

2242 

2243 if not mm: 

2244 return annotation 

2245 

2246 # ticket #8759. Resolve the Mapped name to a real symbol. 

2247 # originally this just checked the name. 

2248 try: 

2249 obj = eval_name_only(mm.group(1), originating_module) 

2250 except NameError as ne: 

2251 raise _CleanupError( 

2252 f'For annotation "{annotation}", could not resolve ' 

2253 f'container type "{mm.group(1)}". ' 

2254 "Please ensure this type is imported at the module level " 

2255 "outside of TYPE_CHECKING blocks" 

2256 ) from ne 

2257 

2258 if obj is typing.ClassVar: 

2259 real_symbol = "ClassVar" 

2260 else: 

2261 try: 

2262 if issubclass(obj, _MappedAnnotationBase): 

2263 real_symbol = obj.__name__ 

2264 else: 

2265 return annotation 

2266 except TypeError: 

2267 # avoid isinstance(obj, type) check, just catch TypeError 

2268 return annotation 

2269 

2270 # note: if one of the codepaths above didn't define real_symbol and 

2271 # then didn't return, real_symbol raises UnboundLocalError 

2272 # which is actually a NameError, and the calling routines don't 

2273 # notice this since they are catching NameError anyway. Just in case 

2274 # this is being modified in the future, something to be aware of. 

2275 

2276 stack = [] 

2277 inner = mm 

2278 while True: 

2279 stack.append(real_symbol if mm is inner else inner.group(1)) 

2280 g2 = inner.group(2) 

2281 inner = re.match(r"^(.+?)\[(.+)\]$", g2) 

2282 if inner is None: 

2283 stack.append(g2) 

2284 break 

2285 

2286 # stacks we want to rewrite, that is, quote the last entry which 

2287 # we think is a relationship class name: 

2288 # 

2289 # ['Mapped', 'List', 'Address'] 

2290 # ['Mapped', 'A'] 

2291 # 

2292 # stacks we dont want to rewrite, which are generally MappedColumn 

2293 # use cases: 

2294 # 

2295 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2296 # ['Mapped', 'dict[str, str] | None'] 

2297 

2298 if ( 

2299 # avoid already quoted symbols such as 

2300 # ['Mapped', "'Optional[Dict[str, str]]'"] 

2301 not re.match(r"""^["'].*["']$""", stack[-1]) 

2302 # avoid further generics like Dict[] such as 

2303 # ['Mapped', 'dict[str, str] | None'] 

2304 and not re.match(r".*\[.*\]", stack[-1]) 

2305 ): 

2306 stripchars = "\"' " 

2307 stack[-1] = ", ".join( 

2308 f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") 

2309 ) 

2310 

2311 annotation = "[".join(stack) + ("]" * (len(stack) - 1)) 

2312 

2313 return annotation 

2314 

2315 

2316def _extract_mapped_subtype( 

2317 raw_annotation: Optional[_AnnotationScanType], 

2318 cls: type, 

2319 originating_module: str, 

2320 key: str, 

2321 attr_cls: Type[Any], 

2322 required: bool, 

2323 is_dataclass_field: bool, 

2324 expect_mapped: bool = True, 

2325 raiseerr: bool = True, 

2326) -> Optional[Tuple[Union[type, str], Optional[type]]]: 

2327 """given an annotation, figure out if it's ``Mapped[something]`` and if 

2328 so, return the ``something`` part. 

2329 

2330 Includes error raise scenarios and other options. 

2331 

2332 """ 

2333 

2334 if raw_annotation is None: 

2335 if required: 

2336 raise sa_exc.ArgumentError( 

2337 f"Python typing annotation is required for attribute " 

2338 f'"{cls.__name__}.{key}" when primary argument(s) for ' 

2339 f'"{attr_cls.__name__}" construct are None or not present' 

2340 ) 

2341 return None 

2342 

2343 try: 

2344 annotated = de_stringify_annotation( 

2345 cls, 

2346 raw_annotation, 

2347 originating_module, 

2348 str_cleanup_fn=_cleanup_mapped_str_annotation, 

2349 ) 

2350 except _CleanupError as ce: 

2351 raise sa_exc.ArgumentError( 

2352 f"Could not interpret annotation {raw_annotation}. " 

2353 "Check that it uses names that are correctly imported at the " 

2354 "module level. See chained stack trace for more hints." 

2355 ) from ce 

2356 except NameError as ne: 

2357 if raiseerr and "Mapped[" in raw_annotation: # type: ignore 

2358 raise sa_exc.ArgumentError( 

2359 f"Could not interpret annotation {raw_annotation}. " 

2360 "Check that it uses names that are correctly imported at the " 

2361 "module level. See chained stack trace for more hints." 

2362 ) from ne 

2363 

2364 annotated = raw_annotation # type: ignore 

2365 

2366 if is_dataclass_field: 

2367 return annotated, None 

2368 else: 

2369 if not hasattr(annotated, "__origin__") or not is_origin_of_cls( 

2370 annotated, _MappedAnnotationBase 

2371 ): 

2372 if expect_mapped: 

2373 if not raiseerr: 

2374 return None 

2375 

2376 origin = getattr(annotated, "__origin__", None) 

2377 if origin is typing.ClassVar: 

2378 return None 

2379 

2380 # check for other kind of ORM descriptor like AssociationProxy, 

2381 # don't raise for that (issue #9957) 

2382 elif isinstance(origin, type) and issubclass( 

2383 origin, ORMDescriptor 

2384 ): 

2385 return None 

2386 

2387 raise sa_exc.ArgumentError( 

2388 f'Type annotation for "{cls.__name__}.{key}" ' 

2389 "can't be correctly interpreted for " 

2390 "Annotated Declarative Table form. ORM annotations " 

2391 "should normally make use of the ``Mapped[]`` generic " 

2392 "type, or other ORM-compatible generic type, as a " 

2393 "container for the actual type, which indicates the " 

2394 "intent that the attribute is mapped. " 

2395 "Class variables that are not intended to be mapped " 

2396 "by the ORM should use ClassVar[]. " 

2397 "To allow Annotated Declarative to disregard legacy " 

2398 "annotations which don't use Mapped[] to pass, set " 

2399 '"__allow_unmapped__ = True" on the class or a ' 

2400 "superclass this class.", 

2401 code="zlpr", 

2402 ) 

2403 

2404 else: 

2405 return annotated, None 

2406 

2407 if len(annotated.__args__) != 1: 

2408 raise sa_exc.ArgumentError( 

2409 "Expected sub-type for Mapped[] annotation" 

2410 ) 

2411 

2412 return annotated.__args__[0], annotated.__origin__ 

2413 

2414 

2415def _mapper_property_as_plain_name(prop: Type[Any]) -> str: 

2416 if hasattr(prop, "_mapper_property_name"): 

2417 name = prop._mapper_property_name() 

2418 else: 

2419 name = None 

2420 return util.clsname_as_plain_name(prop, name)