Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/annotation.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

240 statements  

1# sql/annotation.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 

9copies of SQL constructs which contain context-specific markers and 

10associations. 

11 

12Note that the :class:`.Annotated` concept as implemented in this module is not 

13related in any way to the pep-593 concept of "Annotated". 

14 

15 

16""" 

17 

18from __future__ import annotations 

19 

20import typing 

21from typing import Any 

22from typing import Callable 

23from typing import cast 

24from typing import Dict 

25from typing import FrozenSet 

26from typing import Mapping 

27from typing import Optional 

28from typing import overload 

29from typing import Sequence 

30from typing import Tuple 

31from typing import Type 

32from typing import TYPE_CHECKING 

33from typing import TypeVar 

34 

35from . import operators 

36from .cache_key import HasCacheKey 

37from .visitors import anon_map 

38from .visitors import ExternallyTraversible 

39from .visitors import InternalTraversal 

40from .. import util 

41from ..util.typing import Literal 

42from ..util.typing import Self 

43 

44if TYPE_CHECKING: 

45 from .base import _EntityNamespace 

46 from .visitors import _TraverseInternalsType 

47 

48_AnnotationDict = Mapping[str, Any] 

49 

50EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT 

51 

52 

53class SupportsAnnotations(ExternallyTraversible): 

54 __slots__ = () 

55 

56 _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS 

57 

58 proxy_set: util.generic_fn_descriptor[FrozenSet[Any]] 

59 

60 _is_immutable: bool 

61 

62 def _annotate(self, values: _AnnotationDict) -> Self: 

63 raise NotImplementedError() 

64 

65 @overload 

66 def _deannotate( 

67 self, 

68 values: Literal[None] = ..., 

69 clone: bool = ..., 

70 ) -> Self: ... 

71 

72 @overload 

73 def _deannotate( 

74 self, 

75 values: Sequence[str] = ..., 

76 clone: bool = ..., 

77 ) -> SupportsAnnotations: ... 

78 

79 def _deannotate( 

80 self, 

81 values: Optional[Sequence[str]] = None, 

82 clone: bool = False, 

83 ) -> SupportsAnnotations: 

84 raise NotImplementedError() 

85 

86 @util.memoized_property 

87 def _annotations_cache_key(self) -> Tuple[Any, ...]: 

88 anon_map_ = anon_map() 

89 

90 return self._gen_annotations_cache_key(anon_map_) 

91 

92 def _gen_annotations_cache_key( 

93 self, anon_map: anon_map 

94 ) -> Tuple[Any, ...]: 

95 return ( 

96 "_annotations", 

97 tuple( 

98 ( 

99 key, 

100 ( 

101 value._gen_cache_key(anon_map, []) 

102 if isinstance(value, HasCacheKey) 

103 else value 

104 ), 

105 ) 

106 for key, value in [ 

107 (key, self._annotations[key]) 

108 for key in sorted(self._annotations) 

109 ] 

110 ), 

111 ) 

112 

113 

114class SupportsWrappingAnnotations(SupportsAnnotations): 

115 __slots__ = () 

116 

117 _constructor: Callable[..., SupportsWrappingAnnotations] 

118 

119 if TYPE_CHECKING: 

120 

121 @util.ro_non_memoized_property 

122 def entity_namespace(self) -> _EntityNamespace: ... 

123 

124 def _annotate(self, values: _AnnotationDict) -> Self: 

125 """return a copy of this ClauseElement with annotations 

126 updated by the given dictionary. 

127 

128 """ 

129 return Annotated._as_annotated_instance(self, values) # type: ignore 

130 

131 def _with_annotations(self, values: _AnnotationDict) -> Self: 

132 """return a copy of this ClauseElement with annotations 

133 replaced by the given dictionary. 

134 

135 """ 

136 return Annotated._as_annotated_instance(self, values) # type: ignore 

137 

138 @overload 

139 def _deannotate( 

140 self, 

141 values: Literal[None] = ..., 

142 clone: bool = ..., 

143 ) -> Self: ... 

144 

145 @overload 

146 def _deannotate( 

147 self, 

148 values: Sequence[str] = ..., 

149 clone: bool = ..., 

150 ) -> SupportsAnnotations: ... 

151 

152 def _deannotate( 

153 self, 

154 values: Optional[Sequence[str]] = None, 

155 clone: bool = False, 

156 ) -> SupportsAnnotations: 

157 """return a copy of this :class:`_expression.ClauseElement` 

158 with annotations 

159 removed. 

160 

161 :param values: optional tuple of individual values 

162 to remove. 

163 

164 """ 

165 if clone: 

166 s = self._clone() 

167 return s 

168 else: 

169 return self 

170 

171 

172class SupportsCloneAnnotations(SupportsWrappingAnnotations): 

173 # SupportsCloneAnnotations extends from SupportsWrappingAnnotations 

174 # to support the structure of having the base ClauseElement 

175 # be a subclass of SupportsWrappingAnnotations. Any ClauseElement 

176 # subclass that wants to extend from SupportsCloneAnnotations 

177 # will inherently also be subclassing SupportsWrappingAnnotations, so 

178 # make that specific here. 

179 

180 if not typing.TYPE_CHECKING: 

181 __slots__ = () 

182 

183 _clone_annotations_traverse_internals: _TraverseInternalsType = [ 

184 ("_annotations", InternalTraversal.dp_annotations_key) 

185 ] 

186 

187 def _annotate(self, values: _AnnotationDict) -> Self: 

188 """return a copy of this ClauseElement with annotations 

189 updated by the given dictionary. 

190 

191 """ 

192 new = self._clone() 

193 new._annotations = new._annotations.union(values) 

194 new.__dict__.pop("_annotations_cache_key", None) 

195 new.__dict__.pop("_generate_cache_key", None) 

196 return new 

197 

198 def _with_annotations(self, values: _AnnotationDict) -> Self: 

199 """return a copy of this ClauseElement with annotations 

200 replaced by the given dictionary. 

201 

202 """ 

203 new = self._clone() 

204 new._annotations = util.immutabledict(values) 

205 new.__dict__.pop("_annotations_cache_key", None) 

206 new.__dict__.pop("_generate_cache_key", None) 

207 return new 

208 

209 @overload 

210 def _deannotate( 

211 self, 

212 values: Literal[None] = ..., 

213 clone: bool = ..., 

214 ) -> Self: ... 

215 

216 @overload 

217 def _deannotate( 

218 self, 

219 values: Sequence[str] = ..., 

220 clone: bool = ..., 

221 ) -> SupportsAnnotations: ... 

222 

223 def _deannotate( 

224 self, 

225 values: Optional[Sequence[str]] = None, 

226 clone: bool = False, 

227 ) -> SupportsAnnotations: 

228 """return a copy of this :class:`_expression.ClauseElement` 

229 with annotations 

230 removed. 

231 

232 :param values: optional tuple of individual values 

233 to remove. 

234 

235 """ 

236 if clone or self._annotations: 

237 # clone is used when we are also copying 

238 # the expression for a deep deannotation 

239 new = self._clone() 

240 new._annotations = util.immutabledict() 

241 new.__dict__.pop("_annotations_cache_key", None) 

242 return new 

243 else: 

244 return self 

245 

246 

247class Annotated(SupportsAnnotations): 

248 """clones a SupportsAnnotations and applies an 'annotations' dictionary. 

249 

250 Unlike regular clones, this clone also mimics __hash__() and 

251 __eq__() of the original element so that it takes its place 

252 in hashed collections. 

253 

254 A reference to the original element is maintained, for the important 

255 reason of keeping its hash value current. When GC'ed, the 

256 hash value may be reused, causing conflicts. 

257 

258 .. note:: The rationale for Annotated producing a brand new class, 

259 rather than placing the functionality directly within ClauseElement, 

260 is **performance**. The __hash__() method is absent on plain 

261 ClauseElement which leads to significantly reduced function call 

262 overhead, as the use of sets and dictionaries against ClauseElement 

263 objects is prevalent, but most are not "annotated". 

264 

265 """ 

266 

267 _is_column_operators = False 

268 

269 @classmethod 

270 def _as_annotated_instance( 

271 cls, element: SupportsWrappingAnnotations, values: _AnnotationDict 

272 ) -> Annotated: 

273 try: 

274 cls = annotated_classes[element.__class__] 

275 except KeyError: 

276 cls = _new_annotation_type(element.__class__, cls) 

277 return cls(element, values) 

278 

279 _annotations: util.immutabledict[str, Any] 

280 __element: SupportsWrappingAnnotations 

281 _hash: int 

282 

283 def __new__(cls: Type[Self], *args: Any) -> Self: 

284 return object.__new__(cls) 

285 

286 def __init__( 

287 self, element: SupportsWrappingAnnotations, values: _AnnotationDict 

288 ): 

289 self.__dict__ = element.__dict__.copy() 

290 self.__dict__.pop("_annotations_cache_key", None) 

291 self.__dict__.pop("_generate_cache_key", None) 

292 self.__element = element 

293 self._annotations = util.immutabledict(values) 

294 self._hash = hash(element) 

295 

296 def _annotate(self, values: _AnnotationDict) -> Self: 

297 _values = self._annotations.union(values) 

298 new = self._with_annotations(_values) 

299 return new 

300 

301 def _with_annotations(self, values: _AnnotationDict) -> Self: 

302 clone = self.__class__.__new__(self.__class__) 

303 clone.__dict__ = self.__dict__.copy() 

304 clone.__dict__.pop("_annotations_cache_key", None) 

305 clone.__dict__.pop("_generate_cache_key", None) 

306 clone._annotations = util.immutabledict(values) 

307 return clone 

308 

309 @overload 

310 def _deannotate( 

311 self, 

312 values: Literal[None] = ..., 

313 clone: bool = ..., 

314 ) -> Self: ... 

315 

316 @overload 

317 def _deannotate( 

318 self, 

319 values: Sequence[str] = ..., 

320 clone: bool = ..., 

321 ) -> Annotated: ... 

322 

323 def _deannotate( 

324 self, 

325 values: Optional[Sequence[str]] = None, 

326 clone: bool = True, 

327 ) -> SupportsAnnotations: 

328 if values is None: 

329 return self.__element 

330 else: 

331 return self._with_annotations( 

332 util.immutabledict( 

333 { 

334 key: value 

335 for key, value in self._annotations.items() 

336 if key not in values 

337 } 

338 ) 

339 ) 

340 

341 if not typing.TYPE_CHECKING: 

342 # manually proxy some methods that need extra attention 

343 def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any: 

344 return self.__element.__class__._compiler_dispatch( 

345 self, visitor, **kw 

346 ) 

347 

348 @property 

349 def _constructor(self): 

350 return self.__element._constructor 

351 

352 def _clone(self, **kw: Any) -> Self: 

353 clone = self.__element._clone(**kw) 

354 if clone is self.__element: 

355 # detect immutable, don't change anything 

356 return self 

357 else: 

358 # update the clone with any changes that have occurred 

359 # to this object's __dict__. 

360 clone.__dict__.update(self.__dict__) 

361 return self.__class__(clone, self._annotations) 

362 

363 def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]: 

364 return self.__class__, (self.__element, self._annotations) 

365 

366 def __hash__(self) -> int: 

367 return self._hash 

368 

369 def __eq__(self, other: Any) -> bool: 

370 if self._is_column_operators: 

371 return self.__element.__class__.__eq__(self, other) 

372 else: 

373 return hash(other) == hash(self) 

374 

375 @util.ro_non_memoized_property 

376 def entity_namespace(self) -> _EntityNamespace: 

377 if "entity_namespace" in self._annotations: 

378 return cast( 

379 SupportsWrappingAnnotations, 

380 self._annotations["entity_namespace"], 

381 ).entity_namespace 

382 else: 

383 return self.__element.entity_namespace 

384 

385 

386# hard-generate Annotated subclasses. this technique 

387# is used instead of on-the-fly types (i.e. type.__new__()) 

388# so that the resulting objects are pickleable; additionally, other 

389# decisions can be made up front about the type of object being annotated 

390# just once per class rather than per-instance. 

391annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = ( 

392 {} 

393) 

394 

395_SA = TypeVar("_SA", bound="SupportsAnnotations") 

396 

397 

398def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA: 

399 try: 

400 _annotate = to_annotate._annotate 

401 except AttributeError: 

402 # skip objects that don't actually have an `_annotate` 

403 # attribute, namely QueryableAttribute inside of a join 

404 # condition 

405 return to_annotate 

406 else: 

407 return _annotate(annotations) 

408 

409 

410def _deep_annotate( 

411 element: _SA, 

412 annotations: _AnnotationDict, 

413 exclude: Optional[Sequence[SupportsAnnotations]] = None, 

414 *, 

415 detect_subquery_cols: bool = False, 

416 ind_cols_on_fromclause: bool = False, 

417 annotate_callable: Optional[ 

418 Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations] 

419 ] = None, 

420) -> _SA: 

421 """Deep copy the given ClauseElement, annotating each element 

422 with the given annotations dictionary. 

423 

424 Elements within the exclude collection will be cloned but not annotated. 

425 

426 """ 

427 

428 # annotated objects hack the __hash__() method so if we want to 

429 # uniquely process them we have to use id() 

430 

431 cloned_ids: Dict[int, SupportsAnnotations] = {} 

432 

433 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 

434 # ind_cols_on_fromclause means make sure an AnnotatedFromClause 

435 # has its own .c collection independent of that which its proxying. 

436 # this is used specifically by orm.LoaderCriteriaOption to break 

437 # a reference cycle that it's otherwise prone to building, 

438 # see test_relationship_criteria-> 

439 # test_loader_criteria_subquery_w_same_entity. logic here was 

440 # changed for #8796 and made explicit; previously it occurred 

441 # by accident 

442 

443 kw["detect_subquery_cols"] = detect_subquery_cols 

444 id_ = id(elem) 

445 

446 if id_ in cloned_ids: 

447 return cloned_ids[id_] 

448 

449 if ( 

450 exclude 

451 and hasattr(elem, "proxy_set") 

452 and elem.proxy_set.intersection(exclude) 

453 ): 

454 newelem = elem._clone(clone=clone, **kw) 

455 elif annotations != elem._annotations: 

456 if detect_subquery_cols and elem._is_immutable: 

457 to_annotate = elem._clone(clone=clone, **kw) 

458 else: 

459 to_annotate = elem 

460 if annotate_callable: 

461 newelem = annotate_callable(to_annotate, annotations) 

462 else: 

463 newelem = _safe_annotate(to_annotate, annotations) 

464 else: 

465 newelem = elem 

466 

467 newelem._copy_internals( 

468 clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause 

469 ) 

470 

471 cloned_ids[id_] = newelem 

472 return newelem 

473 

474 if element is not None: 

475 element = cast(_SA, clone(element)) 

476 clone = None # type: ignore # remove gc cycles 

477 return element 

478 

479 

480@overload 

481def _deep_deannotate( 

482 element: Literal[None], values: Optional[Sequence[str]] = None 

483) -> Literal[None]: ... 

484 

485 

486@overload 

487def _deep_deannotate( 

488 element: _SA, values: Optional[Sequence[str]] = None 

489) -> _SA: ... 

490 

491 

492def _deep_deannotate( 

493 element: Optional[_SA], values: Optional[Sequence[str]] = None 

494) -> Optional[_SA]: 

495 """Deep copy the given element, removing annotations.""" 

496 

497 cloned: Dict[Any, SupportsAnnotations] = {} 

498 

499 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 

500 key: Any 

501 if values: 

502 key = id(elem) 

503 else: 

504 key = elem 

505 

506 if key not in cloned: 

507 newelem = elem._deannotate(values=values, clone=True) 

508 newelem._copy_internals(clone=clone) 

509 cloned[key] = newelem 

510 return newelem 

511 else: 

512 return cloned[key] 

513 

514 if element is not None: 

515 element = cast(_SA, clone(element)) 

516 clone = None # type: ignore # remove gc cycles 

517 return element 

518 

519 

520def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA: 

521 """Annotate the given ClauseElement and copy its internals so that 

522 internal objects refer to the new annotated object. 

523 

524 Basically used to apply a "don't traverse" annotation to a 

525 selectable, without digging throughout the whole 

526 structure wasting time. 

527 """ 

528 element = element._annotate(annotations) 

529 element._copy_internals() 

530 return element 

531 

532 

533def _new_annotation_type( 

534 cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated] 

535) -> Type[Annotated]: 

536 """Generates a new class that subclasses Annotated and proxies a given 

537 element type. 

538 

539 """ 

540 if issubclass(cls, Annotated): 

541 return cls 

542 elif cls in annotated_classes: 

543 return annotated_classes[cls] 

544 

545 for super_ in cls.__mro__: 

546 # check if an Annotated subclass more specific than 

547 # the given base_cls is already registered, such 

548 # as AnnotatedColumnElement. 

549 if super_ in annotated_classes: 

550 base_cls = annotated_classes[super_] 

551 break 

552 

553 annotated_classes[cls] = anno_cls = cast( 

554 Type[Annotated], 

555 type("Annotated%s" % cls.__name__, (base_cls, cls), {}), 

556 ) 

557 globals()["Annotated%s" % cls.__name__] = anno_cls 

558 

559 if "_traverse_internals" in cls.__dict__: 

560 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

561 ("_annotations", InternalTraversal.dp_annotations_key) 

562 ] 

563 elif cls.__dict__.get("inherit_cache", False): 

564 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

565 ("_annotations", InternalTraversal.dp_annotations_key) 

566 ] 

567 

568 # some classes include this even if they have traverse_internals 

569 # e.g. BindParameter, add it if present. 

570 if cls.__dict__.get("inherit_cache", False): 

571 anno_cls.inherit_cache = True # type: ignore 

572 elif "inherit_cache" in cls.__dict__: 

573 anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore 

574 

575 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators) 

576 

577 return anno_cls 

578 

579 

580def _prepare_annotations( 

581 target_hierarchy: Type[SupportsWrappingAnnotations], 

582 base_cls: Type[Annotated], 

583) -> None: 

584 for cls in util.walk_subclasses(target_hierarchy): 

585 _new_annotation_type(cls, base_cls)