Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/typing.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

291 statements  

1# util/typing.py 

2# Copyright (C) 2022-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import builtins 

12from collections import deque 

13import collections.abc as collections_abc 

14import re 

15import sys 

16from types import NoneType 

17import typing 

18from typing import Any 

19from typing import Callable 

20from typing import Dict 

21from typing import ForwardRef 

22from typing import Generic 

23from typing import get_args 

24from typing import get_origin 

25from typing import Iterable 

26from typing import Literal 

27from typing import Mapping 

28from typing import NewType 

29from typing import NoReturn 

30from typing import Optional 

31from typing import overload 

32from typing import Protocol 

33from typing import Set 

34from typing import Tuple 

35from typing import Type 

36from typing import TYPE_CHECKING 

37from typing import TypeGuard 

38from typing import Union 

39 

40import typing_extensions 

41 

42if True: # zimports removes the tailing comments 

43 from typing_extensions import ( 

44 dataclass_transform as dataclass_transform, # 3.11, 

45 ) 

46 from typing_extensions import NotRequired as NotRequired # 3.11 

47 from typing_extensions import TypeVarTuple as TypeVarTuple # 3.11 

48 from typing_extensions import Self as Self # 3.11 

49 from typing_extensions import TypeAliasType as TypeAliasType # 3.12 

50 from typing_extensions import Unpack as Unpack # 3.11 

51 from typing_extensions import Never as Never # 3.11 

52 from typing_extensions import LiteralString as LiteralString # 3.11 

53 from typing_extensions import TypeVar as TypeVar # 3.13 for default 

54 

55 

56_T = TypeVar("_T", bound=Any) 

57_KT = TypeVar("_KT") 

58_VT = TypeVar("_VT") 

59_VT_co = TypeVar("_VT_co", covariant=True) 

60 

61TupleAny = Tuple[Any, ...] 

62 

63 

64def is_fwd_none(typ: Any) -> bool: 

65 return isinstance(typ, ForwardRef) and typ.__forward_arg__ == "None" 

66 

67 

68_AnnotationScanType = Union[ 

69 Type[Any], str, ForwardRef, NewType, TypeAliasType, "GenericProtocol[Any]" 

70] 

71 

72_MatchedOnType = Union[ 

73 "GenericProtocol[Any]", TypeAliasType, NewType, Type[Any] 

74] 

75 

76 

77class ArgsTypeProtocol(Protocol): 

78 """protocol for types that have ``__args__`` 

79 

80 there's no public interface for this AFAIK 

81 

82 """ 

83 

84 __args__: Tuple[_AnnotationScanType, ...] 

85 

86 

87class GenericProtocol(Protocol[_T]): 

88 """protocol for generic types. 

89 

90 this since Python.typing _GenericAlias is private 

91 

92 """ 

93 

94 __args__: Tuple[_AnnotationScanType, ...] 

95 __origin__: Type[_T] 

96 

97 # Python's builtin _GenericAlias has this method, however builtins like 

98 # list, dict, etc. do not, even though they have ``__origin__`` and 

99 # ``__args__`` 

100 # 

101 # def copy_with(self, params: Tuple[_AnnotationScanType, ...]) -> Type[_T]: 

102 # ... 

103 

104 

105# copied from TypeShed, required in order to implement 

106# MutableMapping.update() 

107class SupportsKeysAndGetItem(Protocol[_KT, _VT_co]): 

108 def keys(self) -> Iterable[_KT]: ... 

109 

110 def __getitem__(self, __k: _KT) -> _VT_co: ... 

111 

112 

113# work around https://github.com/microsoft/pyright/issues/3025 

114_LiteralStar = Literal["*"] 

115 

116 

117def de_stringify_annotation( 

118 cls: Type[Any], 

119 annotation: _AnnotationScanType, 

120 originating_module: str, 

121 locals_: Mapping[str, Any], 

122 *, 

123 str_cleanup_fn: Optional[Callable[[str, str], str]] = None, 

124 include_generic: bool = False, 

125 _already_seen: Optional[Set[Any]] = None, 

126) -> Type[Any]: 

127 """Resolve annotations that may be string based into real objects. 

128 

129 This is particularly important if a module defines "from __future__ import 

130 annotations", as everything inside of __annotations__ is a string. We want 

131 to at least have generic containers like ``Mapped``, ``Union``, ``List``, 

132 etc. 

133 

134 """ 

135 # looked at typing.get_type_hints(), looked at pydantic. We need much 

136 # less here, and we here try to not use any private typing internals 

137 # or construct ForwardRef objects which is documented as something 

138 # that should be avoided. 

139 

140 original_annotation = annotation 

141 

142 if is_fwd_ref(annotation): 

143 annotation = annotation.__forward_arg__ 

144 

145 if isinstance(annotation, str): 

146 if str_cleanup_fn: 

147 annotation = str_cleanup_fn(annotation, originating_module) 

148 

149 annotation = eval_expression( 

150 annotation, originating_module, locals_=locals_, in_class=cls 

151 ) 

152 

153 if ( 

154 include_generic 

155 and is_generic(annotation) 

156 and not is_literal(annotation) 

157 ): 

158 if _already_seen is None: 

159 _already_seen = set() 

160 

161 if annotation in _already_seen: 

162 # only occurs recursively. outermost return type 

163 # will always be Type. 

164 # the element here will be either ForwardRef or 

165 # Optional[ForwardRef] 

166 return original_annotation # type: ignore 

167 else: 

168 _already_seen.add(annotation) 

169 

170 elements = tuple( 

171 de_stringify_annotation( 

172 cls, 

173 elem, 

174 originating_module, 

175 locals_, 

176 str_cleanup_fn=str_cleanup_fn, 

177 include_generic=include_generic, 

178 _already_seen=_already_seen, 

179 ) 

180 for elem in annotation.__args__ 

181 ) 

182 

183 return _copy_generic_annotation_with(annotation, elements) 

184 

185 return annotation # type: ignore 

186 

187 

188def fixup_container_fwd_refs( 

189 type_: _AnnotationScanType, 

190) -> _AnnotationScanType: 

191 """Correct dict['x', 'y'] into dict[ForwardRef('x'), ForwardRef('y')] 

192 and similar for list, set 

193 

194 """ 

195 

196 if ( 

197 is_generic(type_) 

198 and get_origin(type_) 

199 in ( 

200 dict, 

201 set, 

202 list, 

203 collections_abc.MutableSet, 

204 collections_abc.MutableMapping, 

205 collections_abc.MutableSequence, 

206 collections_abc.Mapping, 

207 collections_abc.Sequence, 

208 ) 

209 # fight, kick and scream to struggle to tell the difference between 

210 # dict[] and typing.Dict[] which DO NOT compare the same and DO NOT 

211 # behave the same yet there is NO WAY to distinguish between which type 

212 # it is using public attributes 

213 and not re.match( 

214 "typing.(?:Dict|List|Set|.*Mapping|.*Sequence|.*Set)", repr(type_) 

215 ) 

216 ): 

217 # compat with py3.10 and earlier 

218 return get_origin(type_).__class_getitem__( # type: ignore 

219 tuple( 

220 [ 

221 ForwardRef(elem) if isinstance(elem, str) else elem 

222 for elem in get_args(type_) 

223 ] 

224 ) 

225 ) 

226 return type_ 

227 

228 

229def _copy_generic_annotation_with( 

230 annotation: GenericProtocol[_T], elements: Tuple[_AnnotationScanType, ...] 

231) -> Type[_T]: 

232 if hasattr(annotation, "copy_with"): 

233 # List, Dict, etc. real generics 

234 return annotation.copy_with(elements) # type: ignore 

235 else: 

236 # Python builtins list, dict, etc. 

237 return annotation.__origin__[elements] # type: ignore 

238 

239 

240def eval_expression( 

241 expression: str, 

242 module_name: str, 

243 *, 

244 locals_: Optional[Mapping[str, Any]] = None, 

245 in_class: Optional[Type[Any]] = None, 

246) -> Any: 

247 try: 

248 base_globals: Dict[str, Any] = sys.modules[module_name].__dict__ 

249 except KeyError as ke: 

250 raise NameError( 

251 f"Module {module_name} isn't present in sys.modules; can't " 

252 f"evaluate expression {expression}" 

253 ) from ke 

254 

255 try: 

256 if in_class is not None: 

257 cls_namespace = dict(in_class.__dict__) 

258 cls_namespace.setdefault(in_class.__name__, in_class) 

259 

260 # see #10899. We want the locals/globals to take precedence 

261 # over the class namespace in this context, even though this 

262 # is not the usual way variables would resolve. 

263 cls_namespace.update(base_globals) 

264 

265 annotation = eval(expression, cls_namespace, locals_) 

266 else: 

267 annotation = eval(expression, base_globals, locals_) 

268 except Exception as err: 

269 raise NameError( 

270 f"Could not de-stringify annotation {expression!r}" 

271 ) from err 

272 else: 

273 return annotation 

274 

275 

276def eval_name_only( 

277 name: str, 

278 module_name: str, 

279 *, 

280 locals_: Optional[Mapping[str, Any]] = None, 

281) -> Any: 

282 if "." in name: 

283 return eval_expression(name, module_name, locals_=locals_) 

284 

285 try: 

286 base_globals: Dict[str, Any] = sys.modules[module_name].__dict__ 

287 except KeyError as ke: 

288 raise NameError( 

289 f"Module {module_name} isn't present in sys.modules; can't " 

290 f"resolve name {name}" 

291 ) from ke 

292 

293 # name only, just look in globals. eval() works perfectly fine here, 

294 # however we are seeking to have this be faster, as this occurs for 

295 # every Mapper[] keyword, etc. depending on configuration 

296 try: 

297 return base_globals[name] 

298 except KeyError as ke: 

299 # check in builtins as well to handle `list`, `set` or `dict`, etc. 

300 try: 

301 return builtins.__dict__[name] 

302 except KeyError: 

303 pass 

304 

305 raise NameError( 

306 f"Could not locate name {name} in module {module_name}" 

307 ) from ke 

308 

309 

310def resolve_name_to_real_class_name(name: str, module_name: str) -> str: 

311 try: 

312 obj = eval_name_only(name, module_name) 

313 except NameError: 

314 return name 

315 else: 

316 return getattr(obj, "__name__", name) 

317 

318 

319def is_pep593(type_: Optional[Any]) -> bool: 

320 return type_ is not None and get_origin(type_) in _type_tuples.Annotated 

321 

322 

323def is_non_string_iterable(obj: Any) -> TypeGuard[Iterable[Any]]: 

324 return isinstance(obj, collections_abc.Iterable) and not isinstance( 

325 obj, (str, bytes) 

326 ) 

327 

328 

329def is_literal(type_: Any) -> bool: 

330 return get_origin(type_) in _type_tuples.Literal 

331 

332 

333def is_newtype(type_: Optional[_AnnotationScanType]) -> TypeGuard[NewType]: 

334 return isinstance(type_, _type_tuples.NewType) 

335 

336 

337def is_generic(type_: _AnnotationScanType) -> TypeGuard[GenericProtocol[Any]]: 

338 return hasattr(type_, "__args__") and hasattr(type_, "__origin__") 

339 

340 

341def is_pep695(type_: _AnnotationScanType) -> TypeGuard[TypeAliasType]: 

342 # NOTE: a generic TAT does not instance check as TypeAliasType outside of 

343 # python 3.10. For sqlalchemy use cases it's fine to consider it a TAT 

344 # though. 

345 # NOTE: things seems to work also without this additional check 

346 if is_generic(type_): 

347 if is_pep593(type_): 

348 return False 

349 return is_pep695(type_.__origin__) 

350 return isinstance(type_, _type_instances.TypeAliasType) 

351 

352 

353def pep695_values(type_: _AnnotationScanType) -> Set[Any]: 

354 """Extracts the value from a TypeAliasType, recursively exploring unions 

355 and inner TypeAliasType to flatten them into a single set. 

356 

357 Forward references are not evaluated, so no recursive exploration happens 

358 into them. 

359 """ 

360 _seen = set() 

361 

362 def recursive_value(inner_type): 

363 if inner_type in _seen: 

364 # recursion are not supported (at least it's flagged as 

365 # an error by pyright). Just avoid infinite loop 

366 return inner_type 

367 _seen.add(inner_type) 

368 if not is_pep695(inner_type): 

369 return inner_type 

370 value = inner_type.__value__ 

371 if not is_union(value): 

372 return value 

373 return [recursive_value(t) for t in value.__args__] 

374 

375 res = recursive_value(type_) 

376 if isinstance(res, list): 

377 types = set() 

378 stack = deque(res) 

379 while stack: 

380 t = stack.popleft() 

381 if isinstance(t, list): 

382 stack.extend(t) 

383 else: 

384 types.add(None if t is NoneType or is_fwd_none(t) else t) 

385 return types 

386 else: 

387 return {res} 

388 

389 

390@overload 

391def is_fwd_ref( 

392 type_: _AnnotationScanType, 

393 check_generic: bool = ..., 

394 check_for_plain_string: Literal[False] = ..., 

395) -> TypeGuard[ForwardRef]: ... 

396 

397 

398@overload 

399def is_fwd_ref( 

400 type_: _AnnotationScanType, 

401 check_generic: bool = ..., 

402 check_for_plain_string: bool = ..., 

403) -> TypeGuard[Union[str, ForwardRef]]: ... 

404 

405 

406def is_fwd_ref( 

407 type_: _AnnotationScanType, 

408 check_generic: bool = False, 

409 check_for_plain_string: bool = False, 

410) -> TypeGuard[Union[str, ForwardRef]]: 

411 if check_for_plain_string and isinstance(type_, str): 

412 return True 

413 elif isinstance(type_, _type_instances.ForwardRef): 

414 return True 

415 elif check_generic and is_generic(type_): 

416 return any( 

417 is_fwd_ref( 

418 arg, True, check_for_plain_string=check_for_plain_string 

419 ) 

420 for arg in type_.__args__ 

421 ) 

422 else: 

423 return False 

424 

425 

426@overload 

427def de_optionalize_union_types(type_: str) -> str: ... 

428 

429 

430@overload 

431def de_optionalize_union_types(type_: Type[Any]) -> Type[Any]: ... 

432 

433 

434@overload 

435def de_optionalize_union_types(type_: _MatchedOnType) -> _MatchedOnType: ... 

436 

437 

438@overload 

439def de_optionalize_union_types( 

440 type_: _AnnotationScanType, 

441) -> _AnnotationScanType: ... 

442 

443 

444def de_optionalize_union_types( 

445 type_: _AnnotationScanType, 

446) -> _AnnotationScanType: 

447 """Given a type, filter out ``Union`` types that include ``NoneType`` 

448 to not include the ``NoneType``. 

449 

450 """ 

451 

452 if is_fwd_ref(type_): 

453 return _de_optionalize_fwd_ref_union_types(type_, False) 

454 

455 elif is_union(type_) and includes_none(type_): 

456 typ = { 

457 t 

458 for t in type_.__args__ 

459 if t is not NoneType and not is_fwd_none(t) 

460 } 

461 

462 return make_union_type(*typ) 

463 

464 else: 

465 return type_ 

466 

467 

468@overload 

469def _de_optionalize_fwd_ref_union_types( 

470 type_: ForwardRef, return_has_none: Literal[True] 

471) -> bool: ... 

472 

473 

474@overload 

475def _de_optionalize_fwd_ref_union_types( 

476 type_: ForwardRef, return_has_none: Literal[False] 

477) -> _AnnotationScanType: ... 

478 

479 

480def _de_optionalize_fwd_ref_union_types( 

481 type_: ForwardRef, return_has_none: bool 

482) -> Union[_AnnotationScanType, bool]: 

483 """return the non-optional type for Optional[], Union[None, ...], x|None, 

484 etc. without de-stringifying forward refs. 

485 

486 unfortunately this seems to require lots of hardcoded heuristics 

487 

488 """ 

489 

490 annotation = type_.__forward_arg__ 

491 

492 mm = re.match(r"^(.+?)\[(.+)\]$", annotation) 

493 if mm: 

494 g1 = mm.group(1).split(".")[-1] 

495 if g1 == "Optional": 

496 return True if return_has_none else ForwardRef(mm.group(2)) 

497 elif g1 == "Union": 

498 if "[" in mm.group(2): 

499 # cases like "Union[Dict[str, int], int, None]" 

500 elements: list[str] = [] 

501 current: list[str] = [] 

502 ignore_comma = 0 

503 for char in mm.group(2): 

504 if char == "[": 

505 ignore_comma += 1 

506 elif char == "]": 

507 ignore_comma -= 1 

508 elif ignore_comma == 0 and char == ",": 

509 elements.append("".join(current).strip()) 

510 current.clear() 

511 continue 

512 current.append(char) 

513 else: 

514 elements = re.split(r",\s*", mm.group(2)) 

515 parts = [ForwardRef(elem) for elem in elements if elem != "None"] 

516 if return_has_none: 

517 return len(elements) != len(parts) 

518 else: 

519 return make_union_type(*parts) if parts else Never # type: ignore[return-value] # noqa: E501 

520 else: 

521 return False if return_has_none else type_ 

522 

523 pipe_tokens = re.split(r"\s*\|\s*", annotation) 

524 has_none = "None" in pipe_tokens 

525 if return_has_none: 

526 return has_none 

527 if has_none: 

528 anno_str = "|".join(p for p in pipe_tokens if p != "None") 

529 return ForwardRef(anno_str) if anno_str else Never # type: ignore[return-value] # noqa: E501 

530 

531 return type_ 

532 

533 

534def make_union_type(*types: _AnnotationScanType) -> Type[Any]: 

535 """Make a Union type.""" 

536 

537 return Union[types] # type: ignore 

538 

539 

540def includes_none(type_: Any) -> bool: 

541 """Returns if the type annotation ``type_`` allows ``None``. 

542 

543 This function supports: 

544 * forward refs 

545 * unions 

546 * pep593 - Annotated 

547 * pep695 - TypeAliasType (does not support looking into 

548 fw reference of other pep695) 

549 * NewType 

550 * plain types like ``int``, ``None``, etc 

551 """ 

552 if is_fwd_ref(type_): 

553 return _de_optionalize_fwd_ref_union_types(type_, True) 

554 if is_union(type_): 

555 return any(includes_none(t) for t in get_args(type_)) 

556 if is_pep593(type_): 

557 return includes_none(get_args(type_)[0]) 

558 if is_pep695(type_): 

559 return any(includes_none(t) for t in pep695_values(type_)) 

560 if is_newtype(type_): 

561 return includes_none(type_.__supertype__) 

562 try: 

563 return type_ in (NoneType, None) or is_fwd_none(type_) 

564 except TypeError: 

565 # if type_ is Column, mapped_column(), etc. the use of "in" 

566 # resolves to ``__eq__()`` which then gives us an expression object 

567 # that can't resolve to boolean. just catch it all via exception 

568 return False 

569 

570 

571def is_a_type(type_: Any) -> bool: 

572 return ( 

573 isinstance(type_, type) 

574 or get_origin(type_) is not None 

575 or getattr(type_, "__module__", None) 

576 in ("typing", "typing_extensions") 

577 or type(type_).__mro__[0].__module__ in ("typing", "typing_extensions") 

578 ) 

579 

580 

581def is_union(type_: Any) -> TypeGuard[ArgsTypeProtocol]: 

582 return is_origin_of(type_, "Union", "UnionType") 

583 

584 

585def is_origin_of_cls( 

586 type_: Any, class_obj: Union[Tuple[Type[Any], ...], Type[Any]] 

587) -> bool: 

588 """return True if the given type has an __origin__ that shares a base 

589 with the given class""" 

590 

591 origin = get_origin(type_) 

592 if origin is None: 

593 return False 

594 

595 return isinstance(origin, type) and issubclass(origin, class_obj) 

596 

597 

598def is_origin_of( 

599 type_: Any, *names: str, module: Optional[str] = None 

600) -> bool: 

601 """return True if the given type has an __origin__ with the given name 

602 and optional module.""" 

603 

604 origin = get_origin(type_) 

605 if origin is None: 

606 return False 

607 

608 return origin.__name__ in names and ( 

609 module is None or origin.__module__.startswith(module) 

610 ) 

611 

612 

613class DescriptorProto(Protocol): 

614 def __get__(self, instance: object, owner: Any) -> Any: ... 

615 

616 def __set__(self, instance: Any, value: Any) -> None: ... 

617 

618 def __delete__(self, instance: Any) -> None: ... 

619 

620 

621_DESC = TypeVar("_DESC", bound=DescriptorProto) 

622 

623 

624class DescriptorReference(Generic[_DESC]): 

625 """a descriptor that refers to a descriptor. 

626 

627 used for cases where we need to have an instance variable referring to an 

628 object that is itself a descriptor, which typically confuses typing tools 

629 as they don't know when they should use ``__get__`` or not when referring 

630 to the descriptor assignment as an instance variable. See 

631 sqlalchemy.orm.interfaces.PropComparator.prop 

632 

633 """ 

634 

635 if TYPE_CHECKING: 

636 

637 def __get__(self, instance: object, owner: Any) -> _DESC: ... 

638 

639 def __set__(self, instance: Any, value: _DESC) -> None: ... 

640 

641 def __delete__(self, instance: Any) -> None: ... 

642 

643 

644_DESC_co = TypeVar("_DESC_co", bound=DescriptorProto, covariant=True) 

645 

646 

647class RODescriptorReference(Generic[_DESC_co]): 

648 """a descriptor that refers to a descriptor. 

649 

650 same as :class:`.DescriptorReference` but is read-only, so that subclasses 

651 can define a subtype as the generically contained element 

652 

653 """ 

654 

655 if TYPE_CHECKING: 

656 

657 def __get__(self, instance: object, owner: Any) -> _DESC_co: ... 

658 

659 def __set__(self, instance: Any, value: Any) -> NoReturn: ... 

660 

661 def __delete__(self, instance: Any) -> NoReturn: ... 

662 

663 

664_FN = TypeVar("_FN", bound=Optional[Callable[..., Any]]) 

665 

666 

667class CallableReference(Generic[_FN]): 

668 """a descriptor that refers to a callable. 

669 

670 works around mypy's limitation of not allowing callables assigned 

671 as instance variables 

672 

673 

674 """ 

675 

676 if TYPE_CHECKING: 

677 

678 def __get__(self, instance: object, owner: Any) -> _FN: ... 

679 

680 def __set__(self, instance: Any, value: _FN) -> None: ... 

681 

682 def __delete__(self, instance: Any) -> None: ... 

683 

684 

685class _TypingInstances: 

686 def __getattr__(self, key: str) -> tuple[type, ...]: 

687 types = tuple( 

688 { 

689 t 

690 for t in [ 

691 getattr(typing, key, None), 

692 getattr(typing_extensions, key, None), 

693 ] 

694 if t is not None 

695 } 

696 ) 

697 if not types: 

698 raise AttributeError(key) 

699 self.__dict__[key] = types 

700 return types 

701 

702 

703_type_tuples = _TypingInstances() 

704if TYPE_CHECKING: 

705 _type_instances = typing_extensions 

706else: 

707 _type_instances = _type_tuples 

708 

709LITERAL_TYPES = _type_tuples.Literal