Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/annotation.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

241 statements  

1# sql/annotation.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 

9copies of SQL constructs which contain context-specific markers and 

10associations. 

11 

12Note that the :class:`.Annotated` concept as implemented in this module is not 

13related in any way to the pep-593 concept of "Annotated". 

14 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from operator import itemgetter 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import FrozenSet 

27from typing import Mapping 

28from typing import Optional 

29from typing import overload 

30from typing import Sequence 

31from typing import Tuple 

32from typing import Type 

33from typing import TYPE_CHECKING 

34from typing import TypeVar 

35 

36from . import operators 

37from .cache_key import HasCacheKey 

38from .visitors import anon_map 

39from .visitors import ExternallyTraversible 

40from .visitors import InternalTraversal 

41from .. import util 

42from ..util.typing import Literal 

43from ..util.typing import Self 

44 

45if TYPE_CHECKING: 

46 from .base import _EntityNamespace 

47 from .visitors import _TraverseInternalsType 

48 

49_AnnotationDict = Mapping[str, Any] 

50 

51EMPTY_ANNOTATIONS: util.immutabledict[str, Any] = util.EMPTY_DICT 

52 

53 

54class SupportsAnnotations(ExternallyTraversible): 

55 __slots__ = () 

56 

57 _annotations: util.immutabledict[str, Any] = EMPTY_ANNOTATIONS 

58 

59 proxy_set: util.generic_fn_descriptor[FrozenSet[Any]] 

60 

61 _is_immutable: bool 

62 

63 def _annotate(self, values: _AnnotationDict) -> Self: 

64 raise NotImplementedError() 

65 

66 @overload 

67 def _deannotate( 

68 self, 

69 values: Literal[None] = ..., 

70 clone: bool = ..., 

71 ) -> Self: ... 

72 

73 @overload 

74 def _deannotate( 

75 self, 

76 values: Sequence[str] = ..., 

77 clone: bool = ..., 

78 ) -> SupportsAnnotations: ... 

79 

80 def _deannotate( 

81 self, 

82 values: Optional[Sequence[str]] = None, 

83 clone: bool = False, 

84 ) -> SupportsAnnotations: 

85 raise NotImplementedError() 

86 

87 @util.memoized_property 

88 def _annotations_cache_key(self) -> Tuple[Any, ...]: 

89 anon_map_ = anon_map() 

90 

91 return self._gen_annotations_cache_key(anon_map_) 

92 

93 def _gen_annotations_cache_key( 

94 self, anon_map: anon_map 

95 ) -> Tuple[Any, ...]: 

96 return ( 

97 "_annotations", 

98 tuple( 

99 ( 

100 key, 

101 ( 

102 value._gen_cache_key(anon_map, []) 

103 if isinstance(value, HasCacheKey) 

104 else value 

105 ), 

106 ) 

107 for key, value in sorted( 

108 self._annotations.items(), key=_get_item0 

109 ) 

110 ), 

111 ) 

112 

113 

114_get_item0 = itemgetter(0) 

115 

116 

117class SupportsWrappingAnnotations(SupportsAnnotations): 

118 __slots__ = () 

119 

120 _constructor: Callable[..., SupportsWrappingAnnotations] 

121 

122 if TYPE_CHECKING: 

123 

124 @util.ro_non_memoized_property 

125 def entity_namespace(self) -> _EntityNamespace: ... 

126 

127 def _annotate(self, values: _AnnotationDict) -> Self: 

128 """return a copy of this ClauseElement with annotations 

129 updated by the given dictionary. 

130 

131 """ 

132 return Annotated._as_annotated_instance(self, values) # type: ignore 

133 

134 def _with_annotations(self, values: _AnnotationDict) -> Self: 

135 """return a copy of this ClauseElement with annotations 

136 replaced by the given dictionary. 

137 

138 """ 

139 return Annotated._as_annotated_instance(self, values) # type: ignore 

140 

141 @overload 

142 def _deannotate( 

143 self, 

144 values: Literal[None] = ..., 

145 clone: bool = ..., 

146 ) -> Self: ... 

147 

148 @overload 

149 def _deannotate( 

150 self, 

151 values: Sequence[str] = ..., 

152 clone: bool = ..., 

153 ) -> SupportsAnnotations: ... 

154 

155 def _deannotate( 

156 self, 

157 values: Optional[Sequence[str]] = None, 

158 clone: bool = False, 

159 ) -> SupportsAnnotations: 

160 """return a copy of this :class:`_expression.ClauseElement` 

161 with annotations 

162 removed. 

163 

164 :param values: optional tuple of individual values 

165 to remove. 

166 

167 """ 

168 if clone: 

169 s = self._clone() 

170 return s 

171 else: 

172 return self 

173 

174 

175class SupportsCloneAnnotations(SupportsWrappingAnnotations): 

176 # SupportsCloneAnnotations extends from SupportsWrappingAnnotations 

177 # to support the structure of having the base ClauseElement 

178 # be a subclass of SupportsWrappingAnnotations. Any ClauseElement 

179 # subclass that wants to extend from SupportsCloneAnnotations 

180 # will inherently also be subclassing SupportsWrappingAnnotations, so 

181 # make that specific here. 

182 

183 if not typing.TYPE_CHECKING: 

184 __slots__ = () 

185 

186 _clone_annotations_traverse_internals: _TraverseInternalsType = [ 

187 ("_annotations", InternalTraversal.dp_annotations_key) 

188 ] 

189 

190 def _annotate(self, values: _AnnotationDict) -> Self: 

191 """return a copy of this ClauseElement with annotations 

192 updated by the given dictionary. 

193 

194 """ 

195 new = self._clone() 

196 new._annotations = new._annotations.union(values) 

197 new.__dict__.pop("_annotations_cache_key", None) 

198 new.__dict__.pop("_generate_cache_key", None) 

199 return new 

200 

201 def _with_annotations(self, values: _AnnotationDict) -> Self: 

202 """return a copy of this ClauseElement with annotations 

203 replaced by the given dictionary. 

204 

205 """ 

206 new = self._clone() 

207 new._annotations = util.immutabledict(values) 

208 new.__dict__.pop("_annotations_cache_key", None) 

209 new.__dict__.pop("_generate_cache_key", None) 

210 return new 

211 

212 @overload 

213 def _deannotate( 

214 self, 

215 values: Literal[None] = ..., 

216 clone: bool = ..., 

217 ) -> Self: ... 

218 

219 @overload 

220 def _deannotate( 

221 self, 

222 values: Sequence[str] = ..., 

223 clone: bool = ..., 

224 ) -> SupportsAnnotations: ... 

225 

226 def _deannotate( 

227 self, 

228 values: Optional[Sequence[str]] = None, 

229 clone: bool = False, 

230 ) -> SupportsAnnotations: 

231 """return a copy of this :class:`_expression.ClauseElement` 

232 with annotations 

233 removed. 

234 

235 :param values: optional tuple of individual values 

236 to remove. 

237 

238 """ 

239 if clone or self._annotations: 

240 # clone is used when we are also copying 

241 # the expression for a deep deannotation 

242 new = self._clone() 

243 new._annotations = util.immutabledict() 

244 new.__dict__.pop("_annotations_cache_key", None) 

245 return new 

246 else: 

247 return self 

248 

249 

250class Annotated(SupportsAnnotations): 

251 """clones a SupportsAnnotations and applies an 'annotations' dictionary. 

252 

253 Unlike regular clones, this clone also mimics __hash__() and 

254 __eq__() of the original element so that it takes its place 

255 in hashed collections. 

256 

257 A reference to the original element is maintained, for the important 

258 reason of keeping its hash value current. When GC'ed, the 

259 hash value may be reused, causing conflicts. 

260 

261 .. note:: The rationale for Annotated producing a brand new class, 

262 rather than placing the functionality directly within ClauseElement, 

263 is **performance**. The __hash__() method is absent on plain 

264 ClauseElement which leads to significantly reduced function call 

265 overhead, as the use of sets and dictionaries against ClauseElement 

266 objects is prevalent, but most are not "annotated". 

267 

268 """ 

269 

270 _is_column_operators = False 

271 

272 @classmethod 

273 def _as_annotated_instance( 

274 cls, element: SupportsWrappingAnnotations, values: _AnnotationDict 

275 ) -> Annotated: 

276 try: 

277 cls = annotated_classes[element.__class__] 

278 except KeyError: 

279 cls = _new_annotation_type(element.__class__, cls) 

280 return cls(element, values) 

281 

282 _annotations: util.immutabledict[str, Any] 

283 __element: SupportsWrappingAnnotations 

284 _hash: int 

285 

286 def __new__(cls: Type[Self], *args: Any) -> Self: 

287 return object.__new__(cls) 

288 

289 def __init__( 

290 self, element: SupportsWrappingAnnotations, values: _AnnotationDict 

291 ): 

292 self.__dict__ = element.__dict__.copy() 

293 self.__dict__.pop("_annotations_cache_key", None) 

294 self.__dict__.pop("_generate_cache_key", None) 

295 self.__element = element 

296 self._annotations = util.immutabledict(values) 

297 self._hash = hash(element) 

298 

299 def _annotate(self, values: _AnnotationDict) -> Self: 

300 _values = self._annotations.union(values) 

301 new = self._with_annotations(_values) 

302 return new 

303 

304 def _with_annotations(self, values: _AnnotationDict) -> Self: 

305 clone = self.__class__.__new__(self.__class__) 

306 clone.__dict__ = self.__dict__.copy() 

307 clone.__dict__.pop("_annotations_cache_key", None) 

308 clone.__dict__.pop("_generate_cache_key", None) 

309 clone._annotations = util.immutabledict(values) 

310 return clone 

311 

312 @overload 

313 def _deannotate( 

314 self, 

315 values: Literal[None] = ..., 

316 clone: bool = ..., 

317 ) -> Self: ... 

318 

319 @overload 

320 def _deannotate( 

321 self, 

322 values: Sequence[str] = ..., 

323 clone: bool = ..., 

324 ) -> Annotated: ... 

325 

326 def _deannotate( 

327 self, 

328 values: Optional[Sequence[str]] = None, 

329 clone: bool = True, 

330 ) -> SupportsAnnotations: 

331 if values is None: 

332 return self.__element 

333 else: 

334 return self._with_annotations( 

335 util.immutabledict( 

336 { 

337 key: value 

338 for key, value in self._annotations.items() 

339 if key not in values 

340 } 

341 ) 

342 ) 

343 

344 if not typing.TYPE_CHECKING: 

345 # manually proxy some methods that need extra attention 

346 def _compiler_dispatch(self, visitor: Any, **kw: Any) -> Any: 

347 return self.__element.__class__._compiler_dispatch( 

348 self, visitor, **kw 

349 ) 

350 

351 @property 

352 def _constructor(self): 

353 return self.__element._constructor 

354 

355 def _clone(self, **kw: Any) -> Self: 

356 clone = self.__element._clone(**kw) 

357 if clone is self.__element: 

358 # detect immutable, don't change anything 

359 return self 

360 else: 

361 # update the clone with any changes that have occurred 

362 # to this object's __dict__. 

363 clone.__dict__.update(self.__dict__) 

364 return self.__class__(clone, self._annotations) 

365 

366 def __reduce__(self) -> Tuple[Type[Annotated], Tuple[Any, ...]]: 

367 return self.__class__, (self.__element, self._annotations) 

368 

369 def __hash__(self) -> int: 

370 return self._hash 

371 

372 def __eq__(self, other: Any) -> bool: 

373 if self._is_column_operators: 

374 return self.__element.__class__.__eq__(self, other) 

375 else: 

376 return hash(other) == hash(self) 

377 

378 @util.ro_non_memoized_property 

379 def entity_namespace(self) -> _EntityNamespace: 

380 if "entity_namespace" in self._annotations: 

381 return cast( 

382 SupportsWrappingAnnotations, 

383 self._annotations["entity_namespace"], 

384 ).entity_namespace 

385 else: 

386 return self.__element.entity_namespace 

387 

388 

389# hard-generate Annotated subclasses. this technique 

390# is used instead of on-the-fly types (i.e. type.__new__()) 

391# so that the resulting objects are pickleable; additionally, other 

392# decisions can be made up front about the type of object being annotated 

393# just once per class rather than per-instance. 

394annotated_classes: Dict[Type[SupportsWrappingAnnotations], Type[Annotated]] = ( 

395 {} 

396) 

397 

398_SA = TypeVar("_SA", bound="SupportsAnnotations") 

399 

400 

401def _safe_annotate(to_annotate: _SA, annotations: _AnnotationDict) -> _SA: 

402 try: 

403 _annotate = to_annotate._annotate 

404 except AttributeError: 

405 # skip objects that don't actually have an `_annotate` 

406 # attribute, namely QueryableAttribute inside of a join 

407 # condition 

408 return to_annotate 

409 else: 

410 return _annotate(annotations) 

411 

412 

413def _deep_annotate( 

414 element: _SA, 

415 annotations: _AnnotationDict, 

416 exclude: Optional[Sequence[SupportsAnnotations]] = None, 

417 *, 

418 detect_subquery_cols: bool = False, 

419 ind_cols_on_fromclause: bool = False, 

420 annotate_callable: Optional[ 

421 Callable[[SupportsAnnotations, _AnnotationDict], SupportsAnnotations] 

422 ] = None, 

423) -> _SA: 

424 """Deep copy the given ClauseElement, annotating each element 

425 with the given annotations dictionary. 

426 

427 Elements within the exclude collection will be cloned but not annotated. 

428 

429 """ 

430 

431 # annotated objects hack the __hash__() method so if we want to 

432 # uniquely process them we have to use id() 

433 

434 cloned_ids: Dict[int, SupportsAnnotations] = {} 

435 

436 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 

437 # ind_cols_on_fromclause means make sure an AnnotatedFromClause 

438 # has its own .c collection independent of that which its proxying. 

439 # this is used specifically by orm.LoaderCriteriaOption to break 

440 # a reference cycle that it's otherwise prone to building, 

441 # see test_relationship_criteria-> 

442 # test_loader_criteria_subquery_w_same_entity. logic here was 

443 # changed for #8796 and made explicit; previously it occurred 

444 # by accident 

445 

446 kw["detect_subquery_cols"] = detect_subquery_cols 

447 id_ = id(elem) 

448 

449 if id_ in cloned_ids: 

450 return cloned_ids[id_] 

451 

452 if ( 

453 exclude 

454 and hasattr(elem, "proxy_set") 

455 and elem.proxy_set.intersection(exclude) 

456 ): 

457 newelem = elem._clone(clone=clone, **kw) 

458 elif annotations != elem._annotations: 

459 if detect_subquery_cols and elem._is_immutable: 

460 to_annotate = elem._clone(clone=clone, **kw) 

461 else: 

462 to_annotate = elem 

463 if annotate_callable: 

464 newelem = annotate_callable(to_annotate, annotations) 

465 else: 

466 newelem = _safe_annotate(to_annotate, annotations) 

467 else: 

468 newelem = elem 

469 

470 newelem._copy_internals( 

471 clone=clone, ind_cols_on_fromclause=ind_cols_on_fromclause 

472 ) 

473 

474 cloned_ids[id_] = newelem 

475 return newelem 

476 

477 if element is not None: 

478 element = cast(_SA, clone(element)) 

479 clone = None # type: ignore # remove gc cycles 

480 return element 

481 

482 

483@overload 

484def _deep_deannotate( 

485 element: Literal[None], values: Optional[Sequence[str]] = None 

486) -> Literal[None]: ... 

487 

488 

489@overload 

490def _deep_deannotate( 

491 element: _SA, values: Optional[Sequence[str]] = None 

492) -> _SA: ... 

493 

494 

495def _deep_deannotate( 

496 element: Optional[_SA], values: Optional[Sequence[str]] = None 

497) -> Optional[_SA]: 

498 """Deep copy the given element, removing annotations.""" 

499 

500 cloned: Dict[Any, SupportsAnnotations] = {} 

501 

502 def clone(elem: SupportsAnnotations, **kw: Any) -> SupportsAnnotations: 

503 key: Any 

504 if values: 

505 key = id(elem) 

506 else: 

507 key = elem 

508 

509 if key not in cloned: 

510 newelem = elem._deannotate(values=values, clone=True) 

511 newelem._copy_internals(clone=clone) 

512 cloned[key] = newelem 

513 return newelem 

514 else: 

515 return cloned[key] 

516 

517 if element is not None: 

518 element = cast(_SA, clone(element)) 

519 clone = None # type: ignore # remove gc cycles 

520 return element 

521 

522 

523def _shallow_annotate(element: _SA, annotations: _AnnotationDict) -> _SA: 

524 """Annotate the given ClauseElement and copy its internals so that 

525 internal objects refer to the new annotated object. 

526 

527 Basically used to apply a "don't traverse" annotation to a 

528 selectable, without digging throughout the whole 

529 structure wasting time. 

530 """ 

531 element = element._annotate(annotations) 

532 element._copy_internals() 

533 return element 

534 

535 

536def _new_annotation_type( 

537 cls: Type[SupportsWrappingAnnotations], base_cls: Type[Annotated] 

538) -> Type[Annotated]: 

539 """Generates a new class that subclasses Annotated and proxies a given 

540 element type. 

541 

542 """ 

543 if issubclass(cls, Annotated): 

544 return cls 

545 elif cls in annotated_classes: 

546 return annotated_classes[cls] 

547 

548 for super_ in cls.__mro__: 

549 # check if an Annotated subclass more specific than 

550 # the given base_cls is already registered, such 

551 # as AnnotatedColumnElement. 

552 if super_ in annotated_classes: 

553 base_cls = annotated_classes[super_] 

554 break 

555 

556 annotated_classes[cls] = anno_cls = cast( 

557 Type[Annotated], 

558 type("Annotated%s" % cls.__name__, (base_cls, cls), {}), 

559 ) 

560 globals()["Annotated%s" % cls.__name__] = anno_cls 

561 

562 if "_traverse_internals" in cls.__dict__: 

563 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

564 ("_annotations", InternalTraversal.dp_annotations_key) 

565 ] 

566 elif cls.__dict__.get("inherit_cache", False): 

567 anno_cls._traverse_internals = list(cls._traverse_internals) + [ 

568 ("_annotations", InternalTraversal.dp_annotations_key) 

569 ] 

570 

571 # some classes include this even if they have traverse_internals 

572 # e.g. BindParameter, add it if present. 

573 if cls.__dict__.get("inherit_cache", False): 

574 anno_cls.inherit_cache = True # type: ignore 

575 elif "inherit_cache" in cls.__dict__: 

576 anno_cls.inherit_cache = cls.__dict__["inherit_cache"] # type: ignore 

577 

578 anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators) 

579 

580 return anno_cls 

581 

582 

583def _prepare_annotations( 

584 target_hierarchy: Type[SupportsWrappingAnnotations], 

585 base_cls: Type[Annotated], 

586) -> None: 

587 for cls in util.walk_subclasses(target_hierarchy): 

588 _new_annotation_type(cls, base_cls)