Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1019 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13from __future__ import annotations 

14 

15import collections 

16import enum 

17from functools import update_wrapper 

18import importlib.util 

19import inspect 

20import itertools 

21import operator 

22import re 

23import sys 

24import textwrap 

25import threading 

26import types 

27from types import CodeType 

28from types import ModuleType 

29from typing import Any 

30from typing import Callable 

31from typing import cast 

32from typing import Dict 

33from typing import FrozenSet 

34from typing import Generic 

35from typing import Iterator 

36from typing import List 

37from typing import Literal 

38from typing import Mapping 

39from typing import NoReturn 

40from typing import Optional 

41from typing import overload 

42from typing import Sequence 

43from typing import Set 

44from typing import Tuple 

45from typing import Type 

46from typing import TYPE_CHECKING 

47from typing import TypeVar 

48from typing import Union 

49import warnings 

50 

51from . import _collections 

52from . import compat 

53from .. import exc 

54 

55_T = TypeVar("_T") 

56_T_co = TypeVar("_T_co", covariant=True) 

57_F = TypeVar("_F", bound=Callable[..., Any]) 

58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

59_M = TypeVar("_M", bound=ModuleType) 

60 

61if compat.py314: 

62 

63 import annotationlib 

64 

65 def get_annotations(obj: Any) -> Mapping[str, Any]: 

66 return annotationlib.get_annotations( 

67 obj, format=annotationlib.Format.FORWARDREF 

68 ) 

69 

70else: 

71 

72 def get_annotations(obj: Any) -> Mapping[str, Any]: 

73 return inspect.get_annotations(obj) 

74 

75 

76def restore_annotations( 

77 cls: type, new_annotations: dict[str, Any] 

78) -> Callable[[], None]: 

79 """apply alternate annotations to a class, with a callable to restore 

80 the pristine state of the former. 

81 This is used strictly to provide dataclasses on a mapped class, where 

82 in some cases where are making dataclass fields based on an attribute 

83 that is actually a python descriptor on a superclass which we called 

84 to get a value. 

85 if dataclasses were to give us a way to achieve this without swapping 

86 __annotations__, that would be much better. 

87 """ 

88 delattr_ = object() 

89 

90 # pep-649 means classes have "__annotate__", and it's a callable. if it's 

91 # there and is None, we're in "legacy future mode", where it's python 3.14 

92 # or higher and "from __future__ import annotations" is set. in "legacy 

93 # future mode" we have to do the same steps we do for older pythons, 

94 # __annotate__ can be ignored 

95 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None 

96 

97 if is_pep649: 

98 memoized = { 

99 "__annotate__": getattr(cls, "__annotate__", delattr_), 

100 } 

101 else: 

102 memoized = { 

103 "__annotations__": getattr(cls, "__annotations__", delattr_) 

104 } 

105 

106 cls.__annotations__ = new_annotations 

107 

108 def restore(): 

109 for k, v in memoized.items(): 

110 if v is delattr_: 

111 delattr(cls, k) 

112 else: 

113 setattr(cls, k, v) 

114 

115 return restore 

116 

117 

118def md5_hex(x: Any) -> str: 

119 x = x.encode("utf-8") 

120 m = compat.md5_not_for_security() 

121 m.update(x) 

122 return cast(str, m.hexdigest()) 

123 

124 

125class safe_reraise: 

126 """Reraise an exception after invoking some 

127 handler code. 

128 

129 Stores the existing exception info before 

130 invoking so that it is maintained across a potential 

131 coroutine context switch. 

132 

133 e.g.:: 

134 

135 try: 

136 sess.commit() 

137 except: 

138 with safe_reraise(): 

139 sess.rollback() 

140 

141 TODO: we should at some point evaluate current behaviors in this regard 

142 based on current greenlet, gevent/eventlet implementations in Python 3, and 

143 also see the degree to which our own asyncio (based on greenlet also) is 

144 impacted by this. .rollback() will cause IO / context switch to occur in 

145 all these scenarios; what happens to the exception context from an 

146 "except:" block if we don't explicitly store it? Original issue was #2703. 

147 

148 """ 

149 

150 __slots__ = ("_exc_info",) 

151 

152 _exc_info: Union[ 

153 None, 

154 Tuple[ 

155 Type[BaseException], 

156 BaseException, 

157 types.TracebackType, 

158 ], 

159 Tuple[None, None, None], 

160 ] 

161 

162 def __enter__(self) -> None: 

163 self._exc_info = sys.exc_info() 

164 

165 def __exit__( 

166 self, 

167 type_: Optional[Type[BaseException]], 

168 value: Optional[BaseException], 

169 traceback: Optional[types.TracebackType], 

170 ) -> NoReturn: 

171 assert self._exc_info is not None 

172 # see #2703 for notes 

173 if type_ is None: 

174 exc_type, exc_value, exc_tb = self._exc_info 

175 assert exc_value is not None 

176 self._exc_info = None # remove potential circular references 

177 raise exc_value.with_traceback(exc_tb) 

178 else: 

179 self._exc_info = None # remove potential circular references 

180 assert value is not None 

181 raise value.with_traceback(traceback) 

182 

183 

184def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

185 seen: Set[Any] = set() 

186 

187 stack = [cls] 

188 while stack: 

189 cls = stack.pop() 

190 if cls in seen: 

191 continue 

192 else: 

193 seen.add(cls) 

194 stack.extend(cls.__subclasses__()) 

195 yield cls 

196 

197 

198def string_or_unprintable(element: Any) -> str: 

199 if isinstance(element, str): 

200 return element 

201 else: 

202 try: 

203 return str(element) 

204 except Exception: 

205 return "unprintable element %r" % element 

206 

207 

208def clsname_as_plain_name( 

209 cls: Type[Any], use_name: Optional[str] = None 

210) -> str: 

211 name = use_name or cls.__name__ 

212 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

213 

214 

215def method_is_overridden( 

216 instance_or_cls: Union[Type[Any], object], 

217 against_method: Callable[..., Any], 

218) -> bool: 

219 """Return True if the two class methods don't match.""" 

220 

221 if not isinstance(instance_or_cls, type): 

222 current_cls = instance_or_cls.__class__ 

223 else: 

224 current_cls = instance_or_cls 

225 

226 method_name = against_method.__name__ 

227 

228 current_method: types.MethodType = getattr(current_cls, method_name) 

229 

230 return current_method != against_method 

231 

232 

233def decode_slice(slc: slice) -> Tuple[Any, ...]: 

234 """decode a slice object as sent to __getitem__. 

235 

236 takes into account the 2.5 __index__() method, basically. 

237 

238 """ 

239 ret: List[Any] = [] 

240 for x in slc.start, slc.stop, slc.step: 

241 if hasattr(x, "__index__"): 

242 x = x.__index__() 

243 ret.append(x) 

244 return tuple(ret) 

245 

246 

247def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

248 used_set = set(used) 

249 for base in bases: 

250 pool = itertools.chain( 

251 (base,), 

252 map(lambda i: base + str(i), range(1000)), 

253 ) 

254 for sym in pool: 

255 if sym not in used_set: 

256 used_set.add(sym) 

257 yield sym 

258 break 

259 else: 

260 raise NameError("exhausted namespace for symbol base %s" % base) 

261 

262 

263def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

264 """Call the given function given each nonzero bit from n.""" 

265 

266 while n: 

267 b = n & (~n + 1) 

268 yield fn(b) 

269 n ^= b 

270 

271 

272_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

273 

274# this seems to be in flux in recent mypy versions 

275 

276 

277def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

278 """A signature-matching decorator factory.""" 

279 

280 def decorate(fn: _Fn) -> _Fn: 

281 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

282 raise Exception("not a decoratable function") 

283 

284 # Python 3.14 defer creating __annotations__ until its used. 

285 # We do not want to create __annotations__ now. 

286 annofunc = getattr(fn, "__annotate__", None) 

287 if annofunc is not None: 

288 fn.__annotate__ = None # type: ignore[union-attr] 

289 try: 

290 spec = compat.inspect_getfullargspec(fn) 

291 finally: 

292 fn.__annotate__ = annofunc # type: ignore[union-attr] 

293 else: 

294 spec = compat.inspect_getfullargspec(fn) 

295 

296 # Do not generate code for annotations. 

297 # update_wrapper() copies the annotation from fn to decorated. 

298 # We use dummy defaults for code generation to avoid having 

299 # copy of large globals for compiling. 

300 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

301 empty_defaults = (None,) * len(spec.defaults or ()) 

302 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

303 spec = spec._replace( 

304 annotations={}, 

305 defaults=empty_defaults, 

306 kwonlydefaults=empty_kwdefaults, 

307 ) 

308 

309 names = ( 

310 tuple(cast("Tuple[str, ...]", spec[0])) 

311 + cast("Tuple[str, ...]", spec[1:3]) 

312 + (fn.__name__,) 

313 ) 

314 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

315 

316 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

317 metadata.update(format_argspec_plus(spec, grouped=False)) 

318 metadata["name"] = fn.__name__ 

319 

320 if inspect.iscoroutinefunction(fn): 

321 metadata["prefix"] = "async " 

322 metadata["target_prefix"] = "await " 

323 else: 

324 metadata["prefix"] = "" 

325 metadata["target_prefix"] = "" 

326 

327 # look for __ positional arguments. This is a convention in 

328 # SQLAlchemy that arguments should be passed positionally 

329 # rather than as keyword 

330 # arguments. note that apply_pos doesn't currently work in all cases 

331 # such as when a kw-only indicator "*" is present, which is why 

332 # we limit the use of this to just that case we can detect. As we add 

333 # more kinds of methods that use @decorator, things may have to 

334 # be further improved in this area 

335 if "__" in repr(spec[0]): 

336 code = ( 

337 """\ 

338%(prefix)sdef %(name)s%(grouped_args)s: 

339 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

340""" 

341 % metadata 

342 ) 

343 else: 

344 code = ( 

345 """\ 

346%(prefix)sdef %(name)s%(grouped_args)s: 

347 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

348""" 

349 % metadata 

350 ) 

351 

352 env: Dict[str, Any] = { 

353 targ_name: target, 

354 fn_name: fn, 

355 "__name__": fn.__module__, 

356 } 

357 

358 decorated = cast( 

359 types.FunctionType, 

360 _exec_code_in_env(code, env, fn.__name__), 

361 ) 

362 decorated.__defaults__ = fn.__defaults__ 

363 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

364 return update_wrapper(decorated, fn) # type: ignore[return-value] 

365 

366 return update_wrapper(decorate, target) # type: ignore[return-value] 

367 

368 

369def _exec_code_in_env( 

370 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

371) -> Callable[..., Any]: 

372 exec(code, env) 

373 return env[fn_name] # type: ignore[no-any-return] 

374 

375 

376_PF = TypeVar("_PF") 

377_TE = TypeVar("_TE") 

378 

379 

380class PluginLoader: 

381 def __init__( 

382 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

383 ): 

384 self.group = group 

385 self.impls: Dict[str, Any] = {} 

386 self.auto_fn = auto_fn 

387 

388 def clear(self): 

389 self.impls.clear() 

390 

391 def load(self, name: str) -> Any: 

392 if name in self.impls: 

393 return self.impls[name]() 

394 

395 if self.auto_fn: 

396 loader = self.auto_fn(name) 

397 if loader: 

398 self.impls[name] = loader 

399 return loader() 

400 

401 for impl in compat.importlib_metadata_get(self.group): 

402 if impl.name == name: 

403 self.impls[name] = impl.load 

404 return impl.load() 

405 

406 raise exc.NoSuchModuleError( 

407 "Can't load plugin: %s:%s" % (self.group, name) 

408 ) 

409 

410 def register(self, name: str, modulepath: str, objname: str) -> None: 

411 def load(): 

412 mod = __import__(modulepath) 

413 for token in modulepath.split(".")[1:]: 

414 mod = getattr(mod, token) 

415 return getattr(mod, objname) 

416 

417 self.impls[name] = load 

418 

419 def deregister(self, name: str) -> None: 

420 del self.impls[name] 

421 

422 

423def _inspect_func_args(fn): 

424 try: 

425 co_varkeywords = inspect.CO_VARKEYWORDS 

426 except AttributeError: 

427 # https://docs.python.org/3/library/inspect.html 

428 # The flags are specific to CPython, and may not be defined in other 

429 # Python implementations. Furthermore, the flags are an implementation 

430 # detail, and can be removed or deprecated in future Python releases. 

431 spec = compat.inspect_getfullargspec(fn) 

432 return spec[0], bool(spec[2]) 

433 else: 

434 # use fn.__code__ plus flags to reduce method call overhead 

435 co = fn.__code__ 

436 nargs = co.co_argcount 

437 return ( 

438 list(co.co_varnames[:nargs]), 

439 bool(co.co_flags & co_varkeywords), 

440 ) 

441 

442 

443@overload 

444def get_cls_kwargs( 

445 cls: type, 

446 *, 

447 _set: Optional[Set[str]] = None, 

448 raiseerr: Literal[True] = ..., 

449) -> Set[str]: ... 

450 

451 

452@overload 

453def get_cls_kwargs( 

454 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

455) -> Optional[Set[str]]: ... 

456 

457 

458def get_cls_kwargs( 

459 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

460) -> Optional[Set[str]]: 

461 r"""Return the full set of inherited kwargs for the given `cls`. 

462 

463 Probes a class's __init__ method, collecting all named arguments. If the 

464 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

465 to pass along unrecognized keywords to its base classes, and the 

466 collection process is repeated recursively on each of the bases. 

467 

468 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

469 as this is used within the Core typing system to create copies of type 

470 objects which is a performance-sensitive operation. 

471 

472 No anonymous tuple arguments please ! 

473 

474 """ 

475 toplevel = _set is None 

476 if toplevel: 

477 _set = set() 

478 assert _set is not None 

479 

480 ctr = cls.__dict__.get("__init__", False) 

481 

482 has_init = ( 

483 ctr 

484 and isinstance(ctr, types.FunctionType) 

485 and isinstance(ctr.__code__, types.CodeType) 

486 ) 

487 

488 if has_init: 

489 names, has_kw = _inspect_func_args(ctr) 

490 _set.update(names) 

491 

492 if not has_kw and not toplevel: 

493 if raiseerr: 

494 raise TypeError( 

495 f"given cls {cls} doesn't have an __init__ method" 

496 ) 

497 else: 

498 return None 

499 else: 

500 has_kw = False 

501 

502 if not has_init or has_kw: 

503 for c in cls.__bases__: 

504 if get_cls_kwargs(c, _set=_set) is None: 

505 break 

506 

507 _set.discard("self") 

508 return _set 

509 

510 

511def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

512 """Return the set of legal kwargs for the given `func`. 

513 

514 Uses getargspec so is safe to call for methods, functions, 

515 etc. 

516 

517 """ 

518 

519 return compat.inspect_getfullargspec(func)[0] 

520 

521 

522def get_callable_argspec( 

523 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

524) -> compat.FullArgSpec: 

525 """Return the argument signature for any callable. 

526 

527 All pure-Python callables are accepted, including 

528 functions, methods, classes, objects with __call__; 

529 builtins and other edge cases like functools.partial() objects 

530 raise a TypeError. 

531 

532 """ 

533 if inspect.isbuiltin(fn): 

534 raise TypeError("Can't inspect builtin: %s" % fn) 

535 elif inspect.isfunction(fn): 

536 if _is_init and no_self: 

537 spec = compat.inspect_getfullargspec(fn) 

538 return compat.FullArgSpec( 

539 spec.args[1:], 

540 spec.varargs, 

541 spec.varkw, 

542 spec.defaults, 

543 spec.kwonlyargs, 

544 spec.kwonlydefaults, 

545 spec.annotations, 

546 ) 

547 else: 

548 return compat.inspect_getfullargspec(fn) 

549 elif inspect.ismethod(fn): 

550 if no_self and (_is_init or fn.__self__): 

551 spec = compat.inspect_getfullargspec(fn.__func__) 

552 return compat.FullArgSpec( 

553 spec.args[1:], 

554 spec.varargs, 

555 spec.varkw, 

556 spec.defaults, 

557 spec.kwonlyargs, 

558 spec.kwonlydefaults, 

559 spec.annotations, 

560 ) 

561 else: 

562 return compat.inspect_getfullargspec(fn.__func__) 

563 elif inspect.isclass(fn): 

564 return get_callable_argspec( 

565 fn.__init__, no_self=no_self, _is_init=True 

566 ) 

567 elif hasattr(fn, "__func__"): 

568 return compat.inspect_getfullargspec(fn.__func__) 

569 elif hasattr(fn, "__call__"): 

570 if inspect.ismethod(fn.__call__): 

571 return get_callable_argspec(fn.__call__, no_self=no_self) 

572 else: 

573 raise TypeError("Can't inspect callable: %s" % fn) 

574 else: 

575 raise TypeError("Can't inspect callable: %s" % fn) 

576 

577 

578def format_argspec_plus( 

579 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

580) -> Dict[str, Optional[str]]: 

581 """Returns a dictionary of formatted, introspected function arguments. 

582 

583 A enhanced variant of inspect.formatargspec to support code generation. 

584 

585 fn 

586 An inspectable callable or tuple of inspect getargspec() results. 

587 grouped 

588 Defaults to True; include (parens, around, argument) lists 

589 

590 Returns: 

591 

592 args 

593 Full inspect.formatargspec for fn 

594 self_arg 

595 The name of the first positional argument, varargs[0], or None 

596 if the function defines no positional arguments. 

597 apply_pos 

598 args, re-written in calling rather than receiving syntax. Arguments are 

599 passed positionally. 

600 apply_kw 

601 Like apply_pos, except keyword-ish args are passed as keywords. 

602 apply_pos_proxied 

603 Like apply_pos but omits the self/cls argument 

604 

605 Example:: 

606 

607 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

608 {'grouped_args': '(self, a, b, c=3, **d)', 

609 'self_arg': 'self', 

610 'apply_kw': '(self, a, b, c=c, **d)', 

611 'apply_pos': '(self, a, b, c, **d)'} 

612 

613 """ 

614 if callable(fn): 

615 spec = compat.inspect_getfullargspec(fn) 

616 else: 

617 spec = fn 

618 

619 args = compat.inspect_formatargspec(*spec) 

620 

621 apply_pos = compat.inspect_formatargspec( 

622 spec[0], spec[1], spec[2], None, spec[4] 

623 ) 

624 

625 if spec[0]: 

626 self_arg = spec[0][0] 

627 

628 apply_pos_proxied = compat.inspect_formatargspec( 

629 spec[0][1:], spec[1], spec[2], None, spec[4] 

630 ) 

631 

632 elif spec[1]: 

633 # I'm not sure what this is 

634 self_arg = "%s[0]" % spec[1] 

635 

636 apply_pos_proxied = apply_pos 

637 else: 

638 self_arg = None 

639 apply_pos_proxied = apply_pos 

640 

641 num_defaults = 0 

642 if spec[3]: 

643 num_defaults += len(cast(Tuple[Any], spec[3])) 

644 if spec[4]: 

645 num_defaults += len(spec[4]) 

646 

647 name_args = spec[0] + spec[4] 

648 

649 defaulted_vals: Union[List[str], Tuple[()]] 

650 

651 if num_defaults: 

652 defaulted_vals = name_args[0 - num_defaults :] 

653 else: 

654 defaulted_vals = () 

655 

656 apply_kw = compat.inspect_formatargspec( 

657 name_args, 

658 spec[1], 

659 spec[2], 

660 defaulted_vals, 

661 formatvalue=lambda x: "=" + str(x), 

662 ) 

663 

664 if spec[0]: 

665 apply_kw_proxied = compat.inspect_formatargspec( 

666 name_args[1:], 

667 spec[1], 

668 spec[2], 

669 defaulted_vals, 

670 formatvalue=lambda x: "=" + str(x), 

671 ) 

672 else: 

673 apply_kw_proxied = apply_kw 

674 

675 if grouped: 

676 return dict( 

677 grouped_args=args, 

678 self_arg=self_arg, 

679 apply_pos=apply_pos, 

680 apply_kw=apply_kw, 

681 apply_pos_proxied=apply_pos_proxied, 

682 apply_kw_proxied=apply_kw_proxied, 

683 ) 

684 else: 

685 return dict( 

686 grouped_args=args, 

687 self_arg=self_arg, 

688 apply_pos=apply_pos[1:-1], 

689 apply_kw=apply_kw[1:-1], 

690 apply_pos_proxied=apply_pos_proxied[1:-1], 

691 apply_kw_proxied=apply_kw_proxied[1:-1], 

692 ) 

693 

694 

695def format_argspec_init(method, grouped=True): 

696 """format_argspec_plus with considerations for typical __init__ methods 

697 

698 Wraps format_argspec_plus with error handling strategies for typical 

699 __init__ cases: 

700 

701 .. sourcecode:: text 

702 

703 object.__init__ -> (self) 

704 other unreflectable (usually C) -> (self, *args, **kwargs) 

705 

706 """ 

707 if method is object.__init__: 

708 grouped_args = "(self)" 

709 args = "(self)" if grouped else "self" 

710 proxied = "()" if grouped else "" 

711 else: 

712 try: 

713 return format_argspec_plus(method, grouped=grouped) 

714 except TypeError: 

715 grouped_args = "(self, *args, **kwargs)" 

716 args = grouped_args if grouped else "self, *args, **kwargs" 

717 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

718 return dict( 

719 self_arg="self", 

720 grouped_args=grouped_args, 

721 apply_pos=args, 

722 apply_kw=args, 

723 apply_pos_proxied=proxied, 

724 apply_kw_proxied=proxied, 

725 ) 

726 

727 

728def create_proxy_methods( 

729 target_cls: Type[Any], 

730 target_cls_sphinx_name: str, 

731 proxy_cls_sphinx_name: str, 

732 classmethods: Sequence[str] = (), 

733 methods: Sequence[str] = (), 

734 attributes: Sequence[str] = (), 

735 use_intermediate_variable: Sequence[str] = (), 

736) -> Callable[[_T], _T]: 

737 """A class decorator indicating attributes should refer to a proxy 

738 class. 

739 

740 This decorator is now a "marker" that does nothing at runtime. Instead, 

741 it is consumed by the tools/generate_proxy_methods.py script to 

742 statically generate proxy methods and attributes that are fully 

743 recognized by typing tools such as mypy. 

744 

745 """ 

746 

747 def decorate(cls): 

748 return cls 

749 

750 return decorate 

751 

752 

753def getargspec_init(method): 

754 """inspect.getargspec with considerations for typical __init__ methods 

755 

756 Wraps inspect.getargspec with error handling for typical __init__ cases: 

757 

758 .. sourcecode:: text 

759 

760 object.__init__ -> (self) 

761 other unreflectable (usually C) -> (self, *args, **kwargs) 

762 

763 """ 

764 try: 

765 return compat.inspect_getfullargspec(method) 

766 except TypeError: 

767 if method is object.__init__: 

768 return (["self"], None, None, None) 

769 else: 

770 return (["self"], "args", "kwargs", None) 

771 

772 

773def unbound_method_to_callable(func_or_cls): 

774 """Adjust the incoming callable such that a 'self' argument is not 

775 required. 

776 

777 """ 

778 

779 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

780 return func_or_cls.__func__ 

781 else: 

782 return func_or_cls 

783 

784 

785def generic_repr( 

786 obj: Any, 

787 additional_kw: Sequence[Tuple[str, Any]] = (), 

788 to_inspect: Optional[Union[object, List[object]]] = None, 

789 omit_kwarg: Sequence[str] = (), 

790) -> str: 

791 """Produce a __repr__() based on direct association of the __init__() 

792 specification vs. same-named attributes present. 

793 

794 """ 

795 if to_inspect is None: 

796 to_inspect = [obj] 

797 else: 

798 to_inspect = _collections.to_list(to_inspect) 

799 

800 missing = object() 

801 

802 pos_args = [] 

803 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict() 

804 vargs = None 

805 for i, insp in enumerate(to_inspect): 

806 try: 

807 spec = compat.inspect_getfullargspec(insp.__init__) 

808 except TypeError: 

809 continue 

810 else: 

811 default_len = len(spec.defaults) if spec.defaults else 0 

812 if i == 0: 

813 if spec.varargs: 

814 vargs = spec.varargs 

815 if default_len: 

816 pos_args.extend(spec.args[1:-default_len]) 

817 else: 

818 pos_args.extend(spec.args[1:]) 

819 else: 

820 kw_args.update( 

821 [(arg, missing) for arg in spec.args[1:-default_len]] 

822 ) 

823 

824 if default_len: 

825 assert spec.defaults 

826 kw_args.update( 

827 [ 

828 (arg, default) 

829 for arg, default in zip( 

830 spec.args[-default_len:], spec.defaults 

831 ) 

832 ] 

833 ) 

834 output: List[str] = [] 

835 

836 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

837 

838 if vargs is not None and hasattr(obj, vargs): 

839 output.extend([repr(val) for val in getattr(obj, vargs)]) 

840 

841 for arg, defval in kw_args.items(): 

842 if arg in omit_kwarg: 

843 continue 

844 try: 

845 val = getattr(obj, arg, missing) 

846 if val is not missing and val != defval: 

847 output.append("%s=%r" % (arg, val)) 

848 except Exception: 

849 pass 

850 

851 if additional_kw: 

852 for arg, defval in additional_kw: 

853 try: 

854 val = getattr(obj, arg, missing) 

855 if val is not missing and val != defval: 

856 output.append("%s=%r" % (arg, val)) 

857 except Exception: 

858 pass 

859 

860 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output)) 

861 

862 

863def class_hierarchy(cls): 

864 """Return an unordered sequence of all classes related to cls. 

865 

866 Traverses diamond hierarchies. 

867 

868 Fibs slightly: subclasses of builtin types are not returned. Thus 

869 class_hierarchy(class A(object)) returns (A, object), not A plus every 

870 class systemwide that derives from object. 

871 

872 """ 

873 

874 hier = {cls} 

875 process = list(cls.__mro__) 

876 while process: 

877 c = process.pop() 

878 bases = (_ for _ in c.__bases__ if _ not in hier) 

879 

880 for b in bases: 

881 process.append(b) 

882 hier.add(b) 

883 

884 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

885 continue 

886 

887 for s in [ 

888 _ 

889 for _ in ( 

890 c.__subclasses__() 

891 if not issubclass(c, type) 

892 else c.__subclasses__(c) 

893 ) 

894 if _ not in hier 

895 ]: 

896 process.append(s) 

897 hier.add(s) 

898 return list(hier) 

899 

900 

901def iterate_attributes(cls): 

902 """iterate all the keys and attributes associated 

903 with a class, without using getattr(). 

904 

905 Does not use getattr() so that class-sensitive 

906 descriptors (i.e. property.__get__()) are not called. 

907 

908 """ 

909 keys = dir(cls) 

910 for key in keys: 

911 for c in cls.__mro__: 

912 if key in c.__dict__: 

913 yield (key, c.__dict__[key]) 

914 break 

915 

916 

917def monkeypatch_proxied_specials( 

918 into_cls, 

919 from_cls, 

920 skip=None, 

921 only=None, 

922 name="self.proxy", 

923 from_instance=None, 

924): 

925 """Automates delegation of __specials__ for a proxying type.""" 

926 

927 if only: 

928 dunders = only 

929 else: 

930 if skip is None: 

931 skip = ( 

932 "__slots__", 

933 "__del__", 

934 "__getattribute__", 

935 "__metaclass__", 

936 "__getstate__", 

937 "__setstate__", 

938 ) 

939 dunders = [ 

940 m 

941 for m in dir(from_cls) 

942 if ( 

943 m.startswith("__") 

944 and m.endswith("__") 

945 and not hasattr(into_cls, m) 

946 and m not in skip 

947 ) 

948 ] 

949 

950 for method in dunders: 

951 try: 

952 maybe_fn = getattr(from_cls, method) 

953 if not hasattr(maybe_fn, "__call__"): 

954 continue 

955 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

956 fn = cast(types.FunctionType, maybe_fn) 

957 

958 except AttributeError: 

959 continue 

960 try: 

961 spec = compat.inspect_getfullargspec(fn) 

962 fn_args = compat.inspect_formatargspec(spec[0]) 

963 d_args = compat.inspect_formatargspec(spec[0][1:]) 

964 except TypeError: 

965 fn_args = "(self, *args, **kw)" 

966 d_args = "(*args, **kw)" 

967 

968 py = ( 

969 "def %(method)s%(fn_args)s: " 

970 "return %(name)s.%(method)s%(d_args)s" % locals() 

971 ) 

972 

973 env: Dict[str, types.FunctionType] = ( 

974 from_instance is not None and {name: from_instance} or {} 

975 ) 

976 exec(py, env) 

977 try: 

978 env[method].__defaults__ = fn.__defaults__ 

979 except AttributeError: 

980 pass 

981 setattr(into_cls, method, env[method]) 

982 

983 

984def methods_equivalent(meth1, meth2): 

985 """Return True if the two methods are the same implementation.""" 

986 

987 return getattr(meth1, "__func__", meth1) is getattr( 

988 meth2, "__func__", meth2 

989 ) 

990 

991 

992def as_interface(obj, cls=None, methods=None, required=None): 

993 """Ensure basic interface compliance for an instance or dict of callables. 

994 

995 Checks that ``obj`` implements public methods of ``cls`` or has members 

996 listed in ``methods``. If ``required`` is not supplied, implementing at 

997 least one interface method is sufficient. Methods present on ``obj`` that 

998 are not in the interface are ignored. 

999 

1000 If ``obj`` is a dict and ``dict`` does not meet the interface 

1001 requirements, the keys of the dictionary are inspected. Keys present in 

1002 ``obj`` that are not in the interface will raise TypeErrors. 

1003 

1004 Raises TypeError if ``obj`` does not meet the interface criteria. 

1005 

1006 In all passing cases, an object with callable members is returned. In the 

1007 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1008 an anonymous class is returned. 

1009 

1010 obj 

1011 A type, instance, or dictionary of callables. 

1012 cls 

1013 Optional, a type. All public methods of cls are considered the 

1014 interface. An ``obj`` instance of cls will always pass, ignoring 

1015 ``required``.. 

1016 methods 

1017 Optional, a sequence of method names to consider as the interface. 

1018 required 

1019 Optional, a sequence of mandatory implementations. If omitted, an 

1020 ``obj`` that provides at least one interface method is considered 

1021 sufficient. As a convenience, required may be a type, in which case 

1022 all public methods of the type are required. 

1023 

1024 """ 

1025 if not cls and not methods: 

1026 raise TypeError("a class or collection of method names are required") 

1027 

1028 if isinstance(cls, type) and isinstance(obj, cls): 

1029 return obj 

1030 

1031 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1032 implemented = set(dir(obj)) 

1033 

1034 complies = operator.ge 

1035 if isinstance(required, type): 

1036 required = interface 

1037 elif not required: 

1038 required = set() 

1039 complies = operator.gt 

1040 else: 

1041 required = set(required) 

1042 

1043 if complies(implemented.intersection(interface), required): 

1044 return obj 

1045 

1046 # No dict duck typing here. 

1047 if not isinstance(obj, dict): 

1048 qualifier = complies is operator.gt and "any of" or "all of" 

1049 raise TypeError( 

1050 "%r does not implement %s: %s" 

1051 % (obj, qualifier, ", ".join(interface)) 

1052 ) 

1053 

1054 class AnonymousInterface: 

1055 """A callable-holding shell.""" 

1056 

1057 if cls: 

1058 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1059 found = set() 

1060 

1061 for method, impl in dictlike_iteritems(obj): 

1062 if method not in interface: 

1063 raise TypeError("%r: unknown in this interface" % method) 

1064 if not callable(impl): 

1065 raise TypeError("%r=%r is not callable" % (method, impl)) 

1066 setattr(AnonymousInterface, method, staticmethod(impl)) 

1067 found.add(method) 

1068 

1069 if complies(found, required): 

1070 return AnonymousInterface 

1071 

1072 raise TypeError( 

1073 "dictionary does not contain required keys %s" 

1074 % ", ".join(required - found) 

1075 ) 

1076 

1077 

1078_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1079 

1080 

1081class generic_fn_descriptor(Generic[_T_co]): 

1082 """Descriptor which proxies a function when the attribute is not 

1083 present in dict 

1084 

1085 This superclass is organized in a particular way with "memoized" and 

1086 "non-memoized" implementation classes that are hidden from type checkers, 

1087 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1088 classes used for the same attribute. 

1089 

1090 """ 

1091 

1092 fget: Callable[..., _T_co] 

1093 __doc__: Optional[str] 

1094 __name__: str 

1095 

1096 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1097 self.fget = fget 

1098 self.__doc__ = doc or fget.__doc__ 

1099 self.__name__ = fget.__name__ 

1100 

1101 @overload 

1102 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1103 

1104 @overload 

1105 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1106 

1107 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1108 raise NotImplementedError() 

1109 

1110 if TYPE_CHECKING: 

1111 

1112 def __set__(self, instance: Any, value: Any) -> None: ... 

1113 

1114 def __delete__(self, instance: Any) -> None: ... 

1115 

1116 def _reset(self, obj: Any) -> None: 

1117 raise NotImplementedError() 

1118 

1119 @classmethod 

1120 def reset(cls, obj: Any, name: str) -> None: 

1121 raise NotImplementedError() 

1122 

1123 

1124class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1125 """a plain descriptor that proxies a function. 

1126 

1127 primary rationale is to provide a plain attribute that's 

1128 compatible with memoized_property which is also recognized as equivalent 

1129 by mypy. 

1130 

1131 """ 

1132 

1133 if not TYPE_CHECKING: 

1134 

1135 def __get__(self, obj, cls): 

1136 if obj is None: 

1137 return self 

1138 return self.fget(obj) 

1139 

1140 

1141class _memoized_property(generic_fn_descriptor[_T_co]): 

1142 """A read-only @property that is only evaluated once.""" 

1143 

1144 if not TYPE_CHECKING: 

1145 

1146 def __get__(self, obj, cls): 

1147 if obj is None: 

1148 return self 

1149 obj.__dict__[self.__name__] = result = self.fget(obj) 

1150 return result 

1151 

1152 def _reset(self, obj): 

1153 _memoized_property.reset(obj, self.__name__) 

1154 

1155 @classmethod 

1156 def reset(cls, obj, name): 

1157 obj.__dict__.pop(name, None) 

1158 

1159 

1160# despite many attempts to get Mypy to recognize an overridden descriptor 

1161# where one is memoized and the other isn't, there seems to be no reliable 

1162# way other than completely deceiving the type checker into thinking there 

1163# is just one single descriptor type everywhere. Otherwise, if a superclass 

1164# has non-memoized and subclass has memoized, that requires 

1165# "class memoized(non_memoized)". but then if a superclass has memoized and 

1166# superclass has non-memoized, the class hierarchy of the descriptors 

1167# would need to be reversed; "class non_memoized(memoized)". so there's no 

1168# way to achieve this. 

1169# additional issues, RO properties: 

1170# https://github.com/python/mypy/issues/12440 

1171if TYPE_CHECKING: 

1172 # allow memoized and non-memoized to be freely mixed by having them 

1173 # be the same class 

1174 memoized_property = generic_fn_descriptor 

1175 non_memoized_property = generic_fn_descriptor 

1176 

1177 # for read only situations, mypy only sees @property as read only. 

1178 # read only is needed when a subtype specializes the return type 

1179 # of a property, meaning assignment needs to be disallowed 

1180 ro_memoized_property = property 

1181 ro_non_memoized_property = property 

1182 

1183else: 

1184 memoized_property = ro_memoized_property = _memoized_property 

1185 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1186 

1187 

1188def memoized_instancemethod(fn: _F) -> _F: 

1189 """Decorate a method memoize its return value. 

1190 

1191 Best applied to no-arg methods: memoization is not sensitive to 

1192 argument values, and will always return the same value even when 

1193 called with different arguments. 

1194 

1195 """ 

1196 

1197 def oneshot(self, *args, **kw): 

1198 result = fn(self, *args, **kw) 

1199 

1200 def memo(*a, **kw): 

1201 return result 

1202 

1203 memo.__name__ = fn.__name__ 

1204 memo.__doc__ = fn.__doc__ 

1205 self.__dict__[fn.__name__] = memo 

1206 return result 

1207 

1208 return update_wrapper(oneshot, fn) # type: ignore 

1209 

1210 

1211class HasMemoized: 

1212 """A mixin class that maintains the names of memoized elements in a 

1213 collection for easy cache clearing, generative, etc. 

1214 

1215 """ 

1216 

1217 if not TYPE_CHECKING: 

1218 # support classes that want to have __slots__ with an explicit 

1219 # slot for __dict__. not sure if that requires base __slots__ here. 

1220 __slots__ = () 

1221 

1222 _memoized_keys: FrozenSet[str] = frozenset() 

1223 

1224 def _reset_memoizations(self) -> None: 

1225 for elem in self._memoized_keys: 

1226 self.__dict__.pop(elem, None) 

1227 

1228 def _assert_no_memoizations(self) -> None: 

1229 for elem in self._memoized_keys: 

1230 assert elem not in self.__dict__ 

1231 

1232 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1233 self.__dict__[key] = value 

1234 self._memoized_keys |= {key} 

1235 

1236 class memoized_attribute(memoized_property[_T]): 

1237 """A read-only @property that is only evaluated once. 

1238 

1239 :meta private: 

1240 

1241 """ 

1242 

1243 fget: Callable[..., _T] 

1244 __doc__: Optional[str] 

1245 __name__: str 

1246 

1247 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1248 self.fget = fget 

1249 self.__doc__ = doc or fget.__doc__ 

1250 self.__name__ = fget.__name__ 

1251 

1252 @overload 

1253 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1254 

1255 @overload 

1256 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1257 

1258 def __get__(self, obj, cls): 

1259 if obj is None: 

1260 return self 

1261 obj.__dict__[self.__name__] = result = self.fget(obj) 

1262 obj._memoized_keys |= {self.__name__} 

1263 return result 

1264 

1265 @classmethod 

1266 def memoized_instancemethod(cls, fn: _F) -> _F: 

1267 """Decorate a method memoize its return value. 

1268 

1269 :meta private: 

1270 

1271 """ 

1272 

1273 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1274 result = fn(self, *args, **kw) 

1275 

1276 def memo(*a, **kw): 

1277 return result 

1278 

1279 memo.__name__ = fn.__name__ 

1280 memo.__doc__ = fn.__doc__ 

1281 self.__dict__[fn.__name__] = memo 

1282 self._memoized_keys |= {fn.__name__} 

1283 return result 

1284 

1285 return update_wrapper(oneshot, fn) # type: ignore 

1286 

1287 

1288if TYPE_CHECKING: 

1289 HasMemoized_ro_memoized_attribute = property 

1290else: 

1291 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1292 

1293 

1294class MemoizedSlots: 

1295 """Apply memoized items to an object using a __getattr__ scheme. 

1296 

1297 This allows the functionality of memoized_property and 

1298 memoized_instancemethod to be available to a class using __slots__. 

1299 

1300 The memoized get is not threadsafe under freethreading and the 

1301 creator method may in extremely rare cases be called more than once. 

1302 

1303 """ 

1304 

1305 __slots__ = () 

1306 

1307 def _fallback_getattr(self, key): 

1308 raise AttributeError(key) 

1309 

1310 def __getattr__(self, key: str) -> Any: 

1311 if key.startswith("_memoized_attr_") or key.startswith( 

1312 "_memoized_method_" 

1313 ): 

1314 raise AttributeError(key) 

1315 # to avoid recursion errors when interacting with other __getattr__ 

1316 # schemes that refer to this one, when testing for memoized method 

1317 # look at __class__ only rather than going into __getattr__ again. 

1318 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1319 value = getattr(self, f"_memoized_attr_{key}")() 

1320 setattr(self, key, value) 

1321 return value 

1322 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1323 meth = getattr(self, f"_memoized_method_{key}") 

1324 

1325 def oneshot(*args, **kw): 

1326 result = meth(*args, **kw) 

1327 

1328 def memo(*a, **kw): 

1329 return result 

1330 

1331 memo.__name__ = meth.__name__ 

1332 memo.__doc__ = meth.__doc__ 

1333 setattr(self, key, memo) 

1334 return result 

1335 

1336 oneshot.__doc__ = meth.__doc__ 

1337 return oneshot 

1338 else: 

1339 return self._fallback_getattr(key) 

1340 

1341 

1342# from paste.deploy.converters 

1343def asbool(obj: Any) -> bool: 

1344 if isinstance(obj, str): 

1345 obj = obj.strip().lower() 

1346 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1347 return True 

1348 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1349 return False 

1350 else: 

1351 raise ValueError("String is not true/false: %r" % obj) 

1352 return bool(obj) 

1353 

1354 

1355def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1356 """Return a callable that will evaluate a string as 

1357 boolean, or one of a set of "alternate" string values. 

1358 

1359 """ 

1360 

1361 def bool_or_value(obj: str) -> Union[str, bool]: 

1362 if obj in text: 

1363 return obj 

1364 else: 

1365 return asbool(obj) 

1366 

1367 return bool_or_value 

1368 

1369 

1370def asint(value: Any) -> Optional[int]: 

1371 """Coerce to integer.""" 

1372 

1373 if value is None: 

1374 return value 

1375 return int(value) 

1376 

1377 

1378def coerce_kw_type( 

1379 kw: Dict[str, Any], 

1380 key: str, 

1381 type_: Type[Any], 

1382 flexi_bool: bool = True, 

1383 dest: Optional[Dict[str, Any]] = None, 

1384) -> None: 

1385 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1386 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1387 when coercing to boolean. 

1388 """ 

1389 

1390 if dest is None: 

1391 dest = kw 

1392 

1393 if ( 

1394 key in kw 

1395 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1396 and kw[key] is not None 

1397 ): 

1398 if type_ is bool and flexi_bool: 

1399 dest[key] = asbool(kw[key]) 

1400 else: 

1401 dest[key] = type_(kw[key]) 

1402 

1403 

1404def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1405 """Produce a tuple structure that is cacheable using the __dict__ of 

1406 obj to retrieve values 

1407 

1408 """ 

1409 names = get_cls_kwargs(cls) 

1410 return (cls,) + tuple( 

1411 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1412 ) 

1413 

1414 

1415def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1416 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1417 

1418 Uses inspect to match the named arguments of ``cls``. 

1419 

1420 """ 

1421 

1422 names = get_cls_kwargs(cls) 

1423 kw.update( 

1424 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1425 ) 

1426 return cls(*args, **kw) 

1427 

1428 

1429def counter() -> Callable[[], int]: 

1430 """Return a threadsafe counter function.""" 

1431 

1432 lock = threading.Lock() 

1433 counter = itertools.count(1) 

1434 

1435 # avoid the 2to3 "next" transformation... 

1436 def _next(): 

1437 with lock: 

1438 return next(counter) 

1439 

1440 return _next 

1441 

1442 

1443def duck_type_collection( 

1444 specimen: Any, default: Optional[Type[Any]] = None 

1445) -> Optional[Type[Any]]: 

1446 """Given an instance or class, guess if it is or is acting as one of 

1447 the basic collection types: list, set and dict. If the __emulates__ 

1448 property is present, return that preferentially. 

1449 """ 

1450 

1451 if hasattr(specimen, "__emulates__"): 

1452 # canonicalize set vs sets.Set to a standard: the builtin set 

1453 if specimen.__emulates__ is not None and issubclass( 

1454 specimen.__emulates__, set 

1455 ): 

1456 return set 

1457 else: 

1458 return specimen.__emulates__ # type: ignore 

1459 

1460 isa = issubclass if isinstance(specimen, type) else isinstance 

1461 if isa(specimen, list): 

1462 return list 

1463 elif isa(specimen, set): 

1464 return set 

1465 elif isa(specimen, dict): 

1466 return dict 

1467 

1468 if hasattr(specimen, "append"): 

1469 return list 

1470 elif hasattr(specimen, "add"): 

1471 return set 

1472 elif hasattr(specimen, "set"): 

1473 return dict 

1474 else: 

1475 return default 

1476 

1477 

1478def assert_arg_type( 

1479 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1480) -> Any: 

1481 if isinstance(arg, argtype): 

1482 return arg 

1483 else: 

1484 if isinstance(argtype, tuple): 

1485 raise exc.ArgumentError( 

1486 "Argument '%s' is expected to be one of type %s, got '%s'" 

1487 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1488 ) 

1489 else: 

1490 raise exc.ArgumentError( 

1491 "Argument '%s' is expected to be of type '%s', got '%s'" 

1492 % (name, argtype, type(arg)) 

1493 ) 

1494 

1495 

1496def dictlike_iteritems(dictlike): 

1497 """Return a (key, value) iterator for almost any dict-like object.""" 

1498 

1499 if hasattr(dictlike, "items"): 

1500 return list(dictlike.items()) 

1501 

1502 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1503 if getter is None: 

1504 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1505 

1506 if hasattr(dictlike, "iterkeys"): 

1507 

1508 def iterator(): 

1509 for key in dictlike.iterkeys(): 

1510 assert getter is not None 

1511 yield key, getter(key) 

1512 

1513 return iterator() 

1514 elif hasattr(dictlike, "keys"): 

1515 return iter((key, getter(key)) for key in dictlike.keys()) 

1516 else: 

1517 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1518 

1519 

1520class classproperty(property): 

1521 """A decorator that behaves like @property except that operates 

1522 on classes rather than instances. 

1523 

1524 The decorator is currently special when using the declarative 

1525 module, but note that the 

1526 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1527 decorator should be used for this purpose with declarative. 

1528 

1529 """ 

1530 

1531 fget: Callable[[Any], Any] 

1532 

1533 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1534 super().__init__(fget, *arg, **kw) 

1535 self.__doc__ = fget.__doc__ 

1536 

1537 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1538 return self.fget(cls) 

1539 

1540 

1541class hybridproperty(Generic[_T]): 

1542 def __init__(self, func: Callable[..., _T]): 

1543 self.func = func 

1544 self.clslevel = func 

1545 

1546 def __get__(self, instance: Any, owner: Any) -> _T: 

1547 if instance is None: 

1548 clsval = self.clslevel(owner) 

1549 return clsval 

1550 else: 

1551 return self.func(instance) 

1552 

1553 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1554 self.clslevel = func 

1555 return self 

1556 

1557 

1558class rw_hybridproperty(Generic[_T]): 

1559 def __init__(self, func: Callable[..., _T]): 

1560 self.func = func 

1561 self.clslevel = func 

1562 self.setfn: Optional[Callable[..., Any]] = None 

1563 

1564 def __get__(self, instance: Any, owner: Any) -> _T: 

1565 if instance is None: 

1566 clsval = self.clslevel(owner) 

1567 return clsval 

1568 else: 

1569 return self.func(instance) 

1570 

1571 def __set__(self, instance: Any, value: Any) -> None: 

1572 assert self.setfn is not None 

1573 self.setfn(instance, value) 

1574 

1575 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1576 self.setfn = func 

1577 return self 

1578 

1579 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1580 self.clslevel = func 

1581 return self 

1582 

1583 

1584class hybridmethod(Generic[_T]): 

1585 """Decorate a function as cls- or instance- level.""" 

1586 

1587 def __init__(self, func: Callable[..., _T]): 

1588 self.func = self.__func__ = func 

1589 self.clslevel = func 

1590 

1591 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1592 if instance is None: 

1593 return self.clslevel.__get__(owner, owner.__class__) # type:ignore 

1594 else: 

1595 return self.func.__get__(instance, owner) # type:ignore 

1596 

1597 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1598 self.clslevel = func 

1599 return self 

1600 

1601 

1602class symbol(int): 

1603 """A constant symbol. 

1604 

1605 >>> symbol("foo") is symbol("foo") 

1606 True 

1607 >>> symbol("foo") 

1608 <symbol 'foo> 

1609 

1610 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1611 advantage of symbol() is its repr(). They are also singletons. 

1612 

1613 Repeated calls of symbol('name') will all return the same instance. 

1614 

1615 """ 

1616 

1617 name: str 

1618 

1619 symbols: Dict[str, symbol] = {} 

1620 _lock = threading.Lock() 

1621 

1622 def __new__( 

1623 cls, 

1624 name: str, 

1625 doc: Optional[str] = None, 

1626 canonical: Optional[int] = None, 

1627 ) -> symbol: 

1628 with cls._lock: 

1629 sym = cls.symbols.get(name) 

1630 if sym is None: 

1631 assert isinstance(name, str) 

1632 if canonical is None: 

1633 canonical = hash(name) 

1634 sym = int.__new__(symbol, canonical) 

1635 sym.name = name 

1636 if doc: 

1637 sym.__doc__ = doc 

1638 

1639 # NOTE: we should ultimately get rid of this global thing, 

1640 # however, currently it is to support pickling. The best 

1641 # change would be when we are on py3.11 at a minimum, we 

1642 # switch to stdlib enum.IntFlag. 

1643 cls.symbols[name] = sym 

1644 else: 

1645 if canonical and canonical != sym: 

1646 raise TypeError( 

1647 f"Can't replace canonical symbol for {name!r} " 

1648 f"with new int value {canonical}" 

1649 ) 

1650 return sym 

1651 

1652 def __reduce__(self): 

1653 return symbol, (self.name, "x", int(self)) 

1654 

1655 def __str__(self): 

1656 return repr(self) 

1657 

1658 def __repr__(self): 

1659 return f"symbol({self.name!r})" 

1660 

1661 

1662class _IntFlagMeta(type): 

1663 def __init__( 

1664 cls, 

1665 classname: str, 

1666 bases: Tuple[Type[Any], ...], 

1667 dict_: Dict[str, Any], 

1668 **kw: Any, 

1669 ) -> None: 

1670 items: List[symbol] 

1671 cls._items = items = [] 

1672 for k, v in dict_.items(): 

1673 if re.match(r"^__.*__$", k): 

1674 continue 

1675 if isinstance(v, int): 

1676 sym = symbol(k, canonical=v) 

1677 elif not k.startswith("_"): 

1678 raise TypeError("Expected integer values for IntFlag") 

1679 else: 

1680 continue 

1681 setattr(cls, k, sym) 

1682 items.append(sym) 

1683 

1684 cls.__members__ = _collections.immutabledict( 

1685 {sym.name: sym for sym in items} 

1686 ) 

1687 

1688 def __iter__(self) -> Iterator[symbol]: 

1689 raise NotImplementedError( 

1690 "iter not implemented to ensure compatibility with " 

1691 "Python 3.11 IntFlag. Please use __members__. See " 

1692 "https://github.com/python/cpython/issues/99304" 

1693 ) 

1694 

1695 

1696class _FastIntFlag(metaclass=_IntFlagMeta): 

1697 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1698 operations. 

1699 

1700 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1701 and ``_FastIntFlag`` otherwise. 

1702 

1703 """ 

1704 

1705 

1706if TYPE_CHECKING: 

1707 from enum import IntFlag 

1708 

1709 FastIntFlag = IntFlag 

1710else: 

1711 FastIntFlag = _FastIntFlag 

1712 

1713 

1714_E = TypeVar("_E", bound=enum.Enum) 

1715 

1716 

1717def parse_user_argument_for_enum( 

1718 arg: Any, 

1719 choices: Dict[_E, List[Any]], 

1720 name: str, 

1721 resolve_symbol_names: bool = False, 

1722) -> Optional[_E]: 

1723 """Given a user parameter, parse the parameter into a chosen value 

1724 from a list of choice objects, typically Enum values. 

1725 

1726 The user argument can be a string name that matches the name of a 

1727 symbol, or the symbol object itself, or any number of alternate choices 

1728 such as True/False/ None etc. 

1729 

1730 :param arg: the user argument. 

1731 :param choices: dictionary of enum values to lists of possible 

1732 entries for each. 

1733 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1734 that is raised if the parameter doesn't match any available argument. 

1735 

1736 """ 

1737 for enum_value, choice in choices.items(): 

1738 if arg is enum_value: 

1739 return enum_value 

1740 elif resolve_symbol_names and arg == enum_value.name: 

1741 return enum_value 

1742 elif arg in choice: 

1743 return enum_value 

1744 

1745 if arg is None: 

1746 return None 

1747 

1748 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1749 

1750 

1751_creation_order = 1 

1752 

1753 

1754def set_creation_order(instance: Any) -> None: 

1755 """Assign a '_creation_order' sequence to the given instance. 

1756 

1757 This allows multiple instances to be sorted in order of creation 

1758 (typically within a single thread; the counter is not particularly 

1759 threadsafe). 

1760 

1761 """ 

1762 global _creation_order 

1763 instance._creation_order = _creation_order 

1764 _creation_order += 1 

1765 

1766 

1767def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1768 """executes the given function, catches all exceptions and converts to 

1769 a warning. 

1770 

1771 """ 

1772 try: 

1773 return func(*args, **kwargs) 

1774 except Exception: 

1775 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1776 

1777 

1778def ellipses_string(value, len_=25): 

1779 try: 

1780 if len(value) > len_: 

1781 return "%s..." % value[0:len_] 

1782 else: 

1783 return value 

1784 except TypeError: 

1785 return value 

1786 

1787 

1788class _hash_limit_string(str): 

1789 """A string subclass that can only be hashed on a maximum amount 

1790 of unique values. 

1791 

1792 This is used for warnings so that we can send out parameterized warnings 

1793 without the __warningregistry__ of the module, or the non-overridable 

1794 "once" registry within warnings.py, overloading memory, 

1795 

1796 

1797 """ 

1798 

1799 _hash: int 

1800 

1801 def __new__( 

1802 cls, value: str, num: int, args: Sequence[Any] 

1803 ) -> _hash_limit_string: 

1804 interpolated = (value % args) + ( 

1805 " (this warning may be suppressed after %d occurrences)" % num 

1806 ) 

1807 self = super().__new__(cls, interpolated) 

1808 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1809 return self 

1810 

1811 def __hash__(self) -> int: 

1812 return self._hash 

1813 

1814 def __eq__(self, other: Any) -> bool: 

1815 return hash(self) == hash(other) 

1816 

1817 

1818def warn(msg: str, code: Optional[str] = None) -> None: 

1819 """Issue a warning. 

1820 

1821 If msg is a string, :class:`.exc.SAWarning` is used as 

1822 the category. 

1823 

1824 """ 

1825 if code: 

1826 _warnings_warn(exc.SAWarning(msg, code=code)) 

1827 else: 

1828 _warnings_warn(msg, exc.SAWarning) 

1829 

1830 

1831def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1832 """Issue a warning with a parameterized string, limiting the number 

1833 of registrations. 

1834 

1835 """ 

1836 if args: 

1837 msg = _hash_limit_string(msg, 10, args) 

1838 _warnings_warn(msg, exc.SAWarning) 

1839 

1840 

1841_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1842 

1843 

1844def tag_method_for_warnings( 

1845 message: str, category: Type[Warning] 

1846) -> Callable[[_F], _F]: 

1847 def go(fn): 

1848 _warning_tags[fn.__code__] = (message, category) 

1849 return fn 

1850 

1851 return go 

1852 

1853 

1854_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1855 

1856 

1857def _warnings_warn( 

1858 message: Union[str, Warning], 

1859 category: Optional[Type[Warning]] = None, 

1860 stacklevel: int = 2, 

1861) -> None: 

1862 

1863 if category is None and isinstance(message, Warning): 

1864 category = type(message) 

1865 

1866 # adjust the given stacklevel to be outside of SQLAlchemy 

1867 try: 

1868 frame = sys._getframe(stacklevel) 

1869 except ValueError: 

1870 # being called from less than 3 (or given) stacklevels, weird, 

1871 # but don't crash 

1872 stacklevel = 0 

1873 except: 

1874 # _getframe() doesn't work, weird interpreter issue, weird, 

1875 # ok, but don't crash 

1876 stacklevel = 0 

1877 else: 

1878 stacklevel_found = warning_tag_found = False 

1879 while frame is not None: 

1880 # using __name__ here requires that we have __name__ in the 

1881 # __globals__ of the decorated string functions we make also. 

1882 # we generate this using {"__name__": fn.__module__} 

1883 if not stacklevel_found and not re.match( 

1884 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1885 ): 

1886 # stop incrementing stack level if an out-of-SQLA line 

1887 # were found. 

1888 stacklevel_found = True 

1889 

1890 # however, for the warning tag thing, we have to keep 

1891 # scanning up the whole traceback 

1892 

1893 if frame.f_code in _warning_tags: 

1894 warning_tag_found = True 

1895 (_suffix, _category) = _warning_tags[frame.f_code] 

1896 category = category or _category 

1897 message = f"{message} ({_suffix})" 

1898 

1899 frame = frame.f_back # type: ignore[assignment] 

1900 

1901 if not stacklevel_found: 

1902 stacklevel += 1 

1903 elif stacklevel_found and warning_tag_found: 

1904 break 

1905 

1906 if category is not None: 

1907 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1908 else: 

1909 warnings.warn(message, stacklevel=stacklevel + 1) 

1910 

1911 

1912def only_once( 

1913 fn: Callable[..., _T], retry_on_exception: bool 

1914) -> Callable[..., Optional[_T]]: 

1915 """Decorate the given function to be a no-op after it is called exactly 

1916 once.""" 

1917 

1918 once = [fn] 

1919 

1920 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1921 # strong reference fn so that it isn't garbage collected, 

1922 # which interferes with the event system's expectations 

1923 strong_fn = fn # noqa 

1924 if once: 

1925 once_fn = once.pop() 

1926 try: 

1927 return once_fn(*arg, **kw) 

1928 except: 

1929 if retry_on_exception: 

1930 once.insert(0, once_fn) 

1931 raise 

1932 

1933 return None 

1934 

1935 return go 

1936 

1937 

1938_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

1939_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

1940 

1941 

1942def chop_traceback( 

1943 tb: List[str], 

1944 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

1945 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

1946) -> List[str]: 

1947 """Chop extraneous lines off beginning and end of a traceback. 

1948 

1949 :param tb: 

1950 a list of traceback lines as returned by ``traceback.format_stack()`` 

1951 

1952 :param exclude_prefix: 

1953 a regular expression object matching lines to skip at beginning of 

1954 ``tb`` 

1955 

1956 :param exclude_suffix: 

1957 a regular expression object matching lines to skip at end of ``tb`` 

1958 """ 

1959 start = 0 

1960 end = len(tb) - 1 

1961 while start <= end and exclude_prefix.search(tb[start]): 

1962 start += 1 

1963 while start <= end and exclude_suffix.search(tb[end]): 

1964 end -= 1 

1965 return tb[start : end + 1] 

1966 

1967 

1968def attrsetter(attrname): 

1969 code = "def set(obj, value): obj.%s = value" % attrname 

1970 env = locals().copy() 

1971 exec(code, env) 

1972 return env["set"] 

1973 

1974 

1975_dunders = re.compile("^__.+__$") 

1976 

1977 

1978class TypingOnly: 

1979 """A mixin class that marks a class as 'typing only', meaning it has 

1980 absolutely no methods, attributes, or runtime functionality whatsoever. 

1981 

1982 """ 

1983 

1984 __slots__ = () 

1985 

1986 def __init_subclass__(cls) -> None: 

1987 if TypingOnly in cls.__bases__: 

1988 remaining = { 

1989 name for name in cls.__dict__ if not _dunders.match(name) 

1990 } 

1991 if remaining: 

1992 raise AssertionError( 

1993 f"Class {cls} directly inherits TypingOnly but has " 

1994 f"additional attributes {remaining}." 

1995 ) 

1996 super().__init_subclass__() 

1997 

1998 

1999class EnsureKWArg: 

2000 r"""Apply translation of functions to accept \**kw arguments if they 

2001 don't already. 

2002 

2003 Used to ensure cross-compatibility with third party legacy code, for things 

2004 like compiler visit methods that need to accept ``**kw`` arguments, 

2005 but may have been copied from old code that didn't accept them. 

2006 

2007 """ 

2008 

2009 ensure_kwarg: str 

2010 """a regular expression that indicates method names for which the method 

2011 should accept ``**kw`` arguments. 

2012 

2013 The class will scan for methods matching the name template and decorate 

2014 them if necessary to ensure ``**kw`` parameters are accepted. 

2015 

2016 """ 

2017 

2018 def __init_subclass__(cls) -> None: 

2019 fn_reg = cls.ensure_kwarg 

2020 clsdict = cls.__dict__ 

2021 if fn_reg: 

2022 for key in clsdict: 

2023 m = re.match(fn_reg, key) 

2024 if m: 

2025 fn = clsdict[key] 

2026 spec = compat.inspect_getfullargspec(fn) 

2027 if not spec.varkw: 

2028 wrapped = cls._wrap_w_kw(fn) 

2029 setattr(cls, key, wrapped) 

2030 super().__init_subclass__() 

2031 

2032 @classmethod 

2033 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2034 def wrap(*arg: Any, **kw: Any) -> Any: 

2035 return fn(*arg) 

2036 

2037 return update_wrapper(wrap, fn) 

2038 

2039 

2040def wrap_callable(wrapper, fn): 

2041 """Augment functools.update_wrapper() to work with objects with 

2042 a ``__call__()`` method. 

2043 

2044 :param fn: 

2045 object with __call__ method 

2046 

2047 """ 

2048 if hasattr(fn, "__name__"): 

2049 return update_wrapper(wrapper, fn) 

2050 else: 

2051 _f = wrapper 

2052 _f.__name__ = fn.__class__.__name__ 

2053 if hasattr(fn, "__module__"): 

2054 _f.__module__ = fn.__module__ 

2055 

2056 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2057 _f.__doc__ = fn.__call__.__doc__ 

2058 elif fn.__doc__: 

2059 _f.__doc__ = fn.__doc__ 

2060 

2061 return _f 

2062 

2063 

2064def quoted_token_parser(value): 

2065 """Parse a dotted identifier with accommodation for quoted names. 

2066 

2067 Includes support for SQL-style double quotes as a literal character. 

2068 

2069 E.g.:: 

2070 

2071 >>> quoted_token_parser("name") 

2072 ["name"] 

2073 >>> quoted_token_parser("schema.name") 

2074 ["schema", "name"] 

2075 >>> quoted_token_parser('"Schema"."Name"') 

2076 ['Schema', 'Name'] 

2077 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2078 ['Schema', 'Name""Foo'] 

2079 

2080 """ 

2081 

2082 if '"' not in value: 

2083 return value.split(".") 

2084 

2085 # 0 = outside of quotes 

2086 # 1 = inside of quotes 

2087 state = 0 

2088 result: List[List[str]] = [[]] 

2089 idx = 0 

2090 lv = len(value) 

2091 while idx < lv: 

2092 char = value[idx] 

2093 if char == '"': 

2094 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2095 result[-1].append('"') 

2096 idx += 1 

2097 else: 

2098 state ^= 1 

2099 elif char == "." and state == 0: 

2100 result.append([]) 

2101 else: 

2102 result[-1].append(char) 

2103 idx += 1 

2104 

2105 return ["".join(token) for token in result] 

2106 

2107 

2108def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2109 params = _collections.to_list(params) 

2110 

2111 def decorate(fn): 

2112 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2113 if doc: 

2114 doc = inject_param_text(doc, {param: text for param in params}) 

2115 fn.__doc__ = doc 

2116 return fn 

2117 

2118 return decorate 

2119 

2120 

2121def _dedent_docstring(text: str) -> str: 

2122 split_text = text.split("\n", 1) 

2123 if len(split_text) == 1: 

2124 return text 

2125 else: 

2126 firstline, remaining = split_text 

2127 if not firstline.startswith(" "): 

2128 return firstline + "\n" + textwrap.dedent(remaining) 

2129 else: 

2130 return textwrap.dedent(text) 

2131 

2132 

2133def inject_docstring_text( 

2134 given_doctext: Optional[str], injecttext: str, pos: int 

2135) -> str: 

2136 doctext: str = _dedent_docstring(given_doctext or "") 

2137 lines = doctext.split("\n") 

2138 if len(lines) == 1: 

2139 lines.append("") 

2140 injectlines = textwrap.dedent(injecttext).split("\n") 

2141 if injectlines[0]: 

2142 injectlines.insert(0, "") 

2143 

2144 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2145 blanks.insert(0, 0) 

2146 

2147 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2148 

2149 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2150 return "\n".join(lines) 

2151 

2152 

2153_param_reg = re.compile(r"(\s+):param (.+?):") 

2154 

2155 

2156def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2157 doclines = collections.deque(doctext.splitlines()) 

2158 lines = [] 

2159 

2160 # TODO: this is not working for params like ":param case_sensitive=True:" 

2161 

2162 to_inject = None 

2163 while doclines: 

2164 line = doclines.popleft() 

2165 

2166 m = _param_reg.match(line) 

2167 

2168 if to_inject is None: 

2169 if m: 

2170 param = m.group(2).lstrip("*") 

2171 if param in inject_params: 

2172 # default indent to that of :param: plus one 

2173 indent = " " * len(m.group(1)) + " " 

2174 

2175 # but if the next line has text, use that line's 

2176 # indentation 

2177 if doclines: 

2178 m2 = re.match(r"(\s+)\S", doclines[0]) 

2179 if m2: 

2180 indent = " " * len(m2.group(1)) 

2181 

2182 to_inject = indent + inject_params[param] 

2183 elif m: 

2184 lines.extend(["\n", to_inject, "\n"]) 

2185 to_inject = None 

2186 elif not line.rstrip(): 

2187 lines.extend([line, to_inject, "\n"]) 

2188 to_inject = None 

2189 elif line.endswith("::"): 

2190 # TODO: this still won't cover if the code example itself has 

2191 # blank lines in it, need to detect those via indentation. 

2192 lines.extend([line, doclines.popleft()]) 

2193 continue 

2194 lines.append(line) 

2195 

2196 return "\n".join(lines) 

2197 

2198 

2199def repr_tuple_names(names: List[str]) -> Optional[str]: 

2200 """Trims a list of strings from the middle and return a string of up to 

2201 four elements. Strings greater than 11 characters will be truncated""" 

2202 if len(names) == 0: 

2203 return None 

2204 flag = len(names) <= 4 

2205 names = names[0:4] if flag else names[0:3] + names[-1:] 

2206 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2207 if flag: 

2208 return ", ".join(res) 

2209 else: 

2210 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2211 

2212 

2213def has_compiled_ext(raise_=False): 

2214 from ._has_cython import HAS_CYEXTENSION 

2215 

2216 if HAS_CYEXTENSION: 

2217 return True 

2218 elif raise_: 

2219 raise ImportError( 

2220 "cython extensions were expected to be installed, " 

2221 "but are not present" 

2222 ) 

2223 else: 

2224 return False 

2225 

2226 

2227def load_uncompiled_module(module: _M) -> _M: 

2228 """Load the non-compied version of a module that is also 

2229 compiled with cython. 

2230 """ 

2231 full_name = module.__name__ 

2232 assert module.__spec__ 

2233 parent_name = module.__spec__.parent 

2234 assert parent_name 

2235 parent_module = sys.modules[parent_name] 

2236 assert parent_module.__spec__ 

2237 package_path = parent_module.__spec__.origin 

2238 assert package_path and package_path.endswith("__init__.py") 

2239 

2240 name = full_name.split(".")[-1] 

2241 module_path = package_path.replace("__init__.py", f"{name}.py") 

2242 

2243 py_spec = importlib.util.spec_from_file_location(full_name, module_path) 

2244 assert py_spec 

2245 py_module = importlib.util.module_from_spec(py_spec) 

2246 assert py_spec.loader 

2247 py_spec.loader.exec_module(py_module) 

2248 return cast(_M, py_module) 

2249 

2250 

2251class _Missing(enum.Enum): 

2252 Missing = enum.auto() 

2253 

2254 

2255Missing = _Missing.Missing 

2256MissingOr = Union[_T, Literal[_Missing.Missing]]