Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1032 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13from __future__ import annotations 

14 

15import collections 

16import enum 

17from functools import update_wrapper 

18import importlib.util 

19import inspect 

20import itertools 

21import operator 

22import re 

23import sys 

24import textwrap 

25import threading 

26import types 

27from types import CodeType 

28from types import ModuleType 

29from typing import Any 

30from typing import Callable 

31from typing import cast 

32from typing import Dict 

33from typing import FrozenSet 

34from typing import Generic 

35from typing import Iterator 

36from typing import List 

37from typing import Literal 

38from typing import NoReturn 

39from typing import Optional 

40from typing import overload 

41from typing import Sequence 

42from typing import Set 

43from typing import Tuple 

44from typing import Type 

45from typing import TYPE_CHECKING 

46from typing import TypeVar 

47from typing import Union 

48import warnings 

49 

50from . import _collections 

51from . import compat 

52from .. import exc 

53 

54_T = TypeVar("_T") 

55_T_co = TypeVar("_T_co", covariant=True) 

56_F = TypeVar("_F", bound=Callable[..., Any]) 

57_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

58_M = TypeVar("_M", bound=ModuleType) 

59 

60 

61def restore_annotations( 

62 cls: type, new_annotations: dict[str, Any] 

63) -> Callable[[], None]: 

64 """apply alternate annotations to a class, with a callable to restore 

65 the pristine state of the former. 

66 This is used strictly to provide dataclasses on a mapped class, where 

67 in some cases where are making dataclass fields based on an attribute 

68 that is actually a python descriptor on a superclass which we called 

69 to get a value. 

70 if dataclasses were to give us a way to achieve this without swapping 

71 __annotations__, that would be much better. 

72 """ 

73 delattr_ = object() 

74 

75 # pep-649 means classes have "__annotate__", and it's a callable. if it's 

76 # there and is None, we're in "legacy future mode", where it's python 3.14 

77 # or higher and "from __future__ import annotations" is set. in "legacy 

78 # future mode" we have to do the same steps we do for older pythons, 

79 # __annotate__ can be ignored 

80 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None 

81 

82 if is_pep649: 

83 memoized = { 

84 "__annotate__": getattr(cls, "__annotate__", delattr_), 

85 } 

86 else: 

87 memoized = { 

88 "__annotations__": getattr(cls, "__annotations__", delattr_) 

89 } 

90 

91 cls.__annotations__ = new_annotations 

92 

93 def restore(): 

94 for k, v in memoized.items(): 

95 if v is delattr_: 

96 delattr(cls, k) 

97 else: 

98 setattr(cls, k, v) 

99 

100 return restore 

101 

102 

103def md5_hex(x: Any) -> str: 

104 x = x.encode("utf-8") 

105 m = compat.md5_not_for_security() 

106 m.update(x) 

107 return cast(str, m.hexdigest()) 

108 

109 

110class safe_reraise: 

111 """Reraise an exception after invoking some 

112 handler code. 

113 

114 Stores the existing exception info before 

115 invoking so that it is maintained across a potential 

116 coroutine context switch. 

117 

118 e.g.:: 

119 

120 try: 

121 sess.commit() 

122 except: 

123 with safe_reraise(): 

124 sess.rollback() 

125 

126 TODO: we should at some point evaluate current behaviors in this regard 

127 based on current greenlet, gevent/eventlet implementations in Python 3, and 

128 also see the degree to which our own asyncio (based on greenlet also) is 

129 impacted by this. .rollback() will cause IO / context switch to occur in 

130 all these scenarios; what happens to the exception context from an 

131 "except:" block if we don't explicitly store it? Original issue was #2703. 

132 

133 """ 

134 

135 __slots__ = ("_exc_info",) 

136 

137 _exc_info: Union[ 

138 None, 

139 Tuple[ 

140 Type[BaseException], 

141 BaseException, 

142 types.TracebackType, 

143 ], 

144 Tuple[None, None, None], 

145 ] 

146 

147 def __enter__(self) -> None: 

148 self._exc_info = sys.exc_info() 

149 

150 def __exit__( 

151 self, 

152 type_: Optional[Type[BaseException]], 

153 value: Optional[BaseException], 

154 traceback: Optional[types.TracebackType], 

155 ) -> NoReturn: 

156 assert self._exc_info is not None 

157 # see #2703 for notes 

158 if type_ is None: 

159 exc_type, exc_value, exc_tb = self._exc_info 

160 assert exc_value is not None 

161 self._exc_info = None # remove potential circular references 

162 raise exc_value.with_traceback(exc_tb) 

163 else: 

164 self._exc_info = None # remove potential circular references 

165 assert value is not None 

166 raise value.with_traceback(traceback) 

167 

168 

169def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

170 seen: Set[Any] = set() 

171 

172 stack = [cls] 

173 while stack: 

174 cls = stack.pop() 

175 if cls in seen: 

176 continue 

177 else: 

178 seen.add(cls) 

179 stack.extend(cls.__subclasses__()) 

180 yield cls 

181 

182 

183def string_or_unprintable(element: Any) -> str: 

184 if isinstance(element, str): 

185 return element 

186 else: 

187 try: 

188 return str(element) 

189 except Exception: 

190 return "unprintable element %r" % element 

191 

192 

193def clsname_as_plain_name( 

194 cls: Type[Any], use_name: Optional[str] = None 

195) -> str: 

196 name = use_name or cls.__name__ 

197 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

198 

199 

200def method_is_overridden( 

201 instance_or_cls: Union[Type[Any], object], 

202 against_method: Callable[..., Any], 

203) -> bool: 

204 """Return True if the two class methods don't match.""" 

205 

206 if not isinstance(instance_or_cls, type): 

207 current_cls = instance_or_cls.__class__ 

208 else: 

209 current_cls = instance_or_cls 

210 

211 method_name = against_method.__name__ 

212 

213 current_method: types.MethodType = getattr(current_cls, method_name) 

214 

215 return current_method != against_method 

216 

217 

218def decode_slice(slc: slice) -> Tuple[Any, ...]: 

219 """decode a slice object as sent to __getitem__. 

220 

221 takes into account the 2.5 __index__() method, basically. 

222 

223 """ 

224 ret: List[Any] = [] 

225 for x in slc.start, slc.stop, slc.step: 

226 if hasattr(x, "__index__"): 

227 x = x.__index__() 

228 ret.append(x) 

229 return tuple(ret) 

230 

231 

232def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

233 used_set = set(used) 

234 for base in bases: 

235 pool = itertools.chain( 

236 (base,), 

237 map(lambda i: base + str(i), range(1000)), 

238 ) 

239 for sym in pool: 

240 if sym not in used_set: 

241 used_set.add(sym) 

242 yield sym 

243 break 

244 else: 

245 raise NameError("exhausted namespace for symbol base %s" % base) 

246 

247 

248def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

249 """Call the given function given each nonzero bit from n.""" 

250 

251 while n: 

252 b = n & (~n + 1) 

253 yield fn(b) 

254 n ^= b 

255 

256 

257_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

258 

259# this seems to be in flux in recent mypy versions 

260 

261 

262def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

263 """A signature-matching decorator factory.""" 

264 

265 def decorate(fn: _Fn) -> _Fn: 

266 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

267 raise Exception("not a decoratable function") 

268 

269 # Python 3.14 defer creating __annotations__ until its used. 

270 # We do not want to create __annotations__ now. 

271 annofunc = getattr(fn, "__annotate__", None) 

272 if annofunc is not None: 

273 fn.__annotate__ = None # type: ignore[union-attr] 

274 try: 

275 spec = compat.inspect_getfullargspec(fn) 

276 finally: 

277 fn.__annotate__ = annofunc # type: ignore[union-attr] 

278 else: 

279 spec = compat.inspect_getfullargspec(fn) 

280 

281 # Do not generate code for annotations. 

282 # update_wrapper() copies the annotation from fn to decorated. 

283 # We use dummy defaults for code generation to avoid having 

284 # copy of large globals for compiling. 

285 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

286 empty_defaults = (None,) * len(spec.defaults or ()) 

287 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

288 spec = spec._replace( 

289 annotations={}, 

290 defaults=empty_defaults, 

291 kwonlydefaults=empty_kwdefaults, 

292 ) 

293 

294 names = ( 

295 tuple(cast("Tuple[str, ...]", spec[0])) 

296 + cast("Tuple[str, ...]", spec[1:3]) 

297 + (fn.__name__,) 

298 ) 

299 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

300 

301 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

302 metadata.update(format_argspec_plus(spec, grouped=False)) 

303 metadata["name"] = fn.__name__ 

304 

305 if inspect.iscoroutinefunction(fn): 

306 metadata["prefix"] = "async " 

307 metadata["target_prefix"] = "await " 

308 else: 

309 metadata["prefix"] = "" 

310 metadata["target_prefix"] = "" 

311 

312 # look for __ positional arguments. This is a convention in 

313 # SQLAlchemy that arguments should be passed positionally 

314 # rather than as keyword 

315 # arguments. note that apply_pos doesn't currently work in all cases 

316 # such as when a kw-only indicator "*" is present, which is why 

317 # we limit the use of this to just that case we can detect. As we add 

318 # more kinds of methods that use @decorator, things may have to 

319 # be further improved in this area 

320 if "__" in repr(spec[0]): 

321 code = ( 

322 """\ 

323%(prefix)sdef %(name)s%(grouped_args)s: 

324 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

325""" 

326 % metadata 

327 ) 

328 else: 

329 code = ( 

330 """\ 

331%(prefix)sdef %(name)s%(grouped_args)s: 

332 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

333""" 

334 % metadata 

335 ) 

336 

337 env: Dict[str, Any] = { 

338 targ_name: target, 

339 fn_name: fn, 

340 "__name__": fn.__module__, 

341 } 

342 

343 decorated = cast( 

344 types.FunctionType, 

345 _exec_code_in_env(code, env, fn.__name__), 

346 ) 

347 decorated.__defaults__ = fn.__defaults__ 

348 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

349 return update_wrapper(decorated, fn) # type: ignore[return-value] 

350 

351 return update_wrapper(decorate, target) # type: ignore[return-value] 

352 

353 

354def _exec_code_in_env( 

355 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

356) -> Callable[..., Any]: 

357 exec(code, env) 

358 return env[fn_name] # type: ignore[no-any-return] 

359 

360 

361_PF = TypeVar("_PF") 

362_TE = TypeVar("_TE") 

363 

364 

365class PluginLoader: 

366 def __init__( 

367 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

368 ): 

369 self.group = group 

370 self.impls: Dict[str, Any] = {} 

371 self.auto_fn = auto_fn 

372 

373 def clear(self): 

374 self.impls.clear() 

375 

376 def load(self, name: str) -> Any: 

377 if name in self.impls: 

378 return self.impls[name]() 

379 

380 if self.auto_fn: 

381 loader = self.auto_fn(name) 

382 if loader: 

383 self.impls[name] = loader 

384 return loader() 

385 

386 for impl in compat.importlib_metadata_get(self.group): 

387 if impl.name == name: 

388 self.impls[name] = impl.load 

389 return impl.load() 

390 

391 raise exc.NoSuchModuleError( 

392 "Can't load plugin: %s:%s" % (self.group, name) 

393 ) 

394 

395 def register(self, name: str, modulepath: str, objname: str) -> None: 

396 def load(): 

397 mod = __import__(modulepath) 

398 for token in modulepath.split(".")[1:]: 

399 mod = getattr(mod, token) 

400 return getattr(mod, objname) 

401 

402 self.impls[name] = load 

403 

404 def deregister(self, name: str) -> None: 

405 del self.impls[name] 

406 

407 

408def _inspect_func_args(fn): 

409 try: 

410 co_varkeywords = inspect.CO_VARKEYWORDS 

411 except AttributeError: 

412 # https://docs.python.org/3/library/inspect.html 

413 # The flags are specific to CPython, and may not be defined in other 

414 # Python implementations. Furthermore, the flags are an implementation 

415 # detail, and can be removed or deprecated in future Python releases. 

416 spec = compat.inspect_getfullargspec(fn) 

417 return spec[0], bool(spec[2]) 

418 else: 

419 # use fn.__code__ plus flags to reduce method call overhead 

420 co = fn.__code__ 

421 nargs = co.co_argcount 

422 return ( 

423 list(co.co_varnames[:nargs]), 

424 bool(co.co_flags & co_varkeywords), 

425 ) 

426 

427 

428@overload 

429def get_cls_kwargs( 

430 cls: type, 

431 *, 

432 _set: Optional[Set[str]] = None, 

433 raiseerr: Literal[True] = ..., 

434) -> Set[str]: ... 

435 

436 

437@overload 

438def get_cls_kwargs( 

439 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

440) -> Optional[Set[str]]: ... 

441 

442 

443def get_cls_kwargs( 

444 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

445) -> Optional[Set[str]]: 

446 r"""Return the full set of inherited kwargs for the given `cls`. 

447 

448 Probes a class's __init__ method, collecting all named arguments. If the 

449 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

450 to pass along unrecognized keywords to its base classes, and the 

451 collection process is repeated recursively on each of the bases. 

452 

453 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

454 as this is used within the Core typing system to create copies of type 

455 objects which is a performance-sensitive operation. 

456 

457 No anonymous tuple arguments please ! 

458 

459 """ 

460 toplevel = _set is None 

461 if toplevel: 

462 _set = set() 

463 assert _set is not None 

464 

465 ctr = cls.__dict__.get("__init__", False) 

466 

467 has_init = ( 

468 ctr 

469 and isinstance(ctr, types.FunctionType) 

470 and isinstance(ctr.__code__, types.CodeType) 

471 ) 

472 

473 if has_init: 

474 names, has_kw = _inspect_func_args(ctr) 

475 _set.update(names) 

476 

477 if not has_kw and not toplevel: 

478 if raiseerr: 

479 raise TypeError( 

480 f"given cls {cls} doesn't have an __init__ method" 

481 ) 

482 else: 

483 return None 

484 else: 

485 has_kw = False 

486 

487 if not has_init or has_kw: 

488 for c in cls.__bases__: 

489 if get_cls_kwargs(c, _set=_set) is None: 

490 break 

491 

492 _set.discard("self") 

493 return _set 

494 

495 

496def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

497 """Return the set of legal kwargs for the given `func`. 

498 

499 Uses getargspec so is safe to call for methods, functions, 

500 etc. 

501 

502 """ 

503 

504 return compat.inspect_getfullargspec(func)[0] 

505 

506 

507def get_callable_argspec( 

508 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

509) -> compat.FullArgSpec: 

510 """Return the argument signature for any callable. 

511 

512 All pure-Python callables are accepted, including 

513 functions, methods, classes, objects with __call__; 

514 builtins and other edge cases like functools.partial() objects 

515 raise a TypeError. 

516 

517 """ 

518 if inspect.isbuiltin(fn): 

519 raise TypeError("Can't inspect builtin: %s" % fn) 

520 elif inspect.isfunction(fn): 

521 if _is_init and no_self: 

522 spec = compat.inspect_getfullargspec(fn) 

523 return compat.FullArgSpec( 

524 spec.args[1:], 

525 spec.varargs, 

526 spec.varkw, 

527 spec.defaults, 

528 spec.kwonlyargs, 

529 spec.kwonlydefaults, 

530 spec.annotations, 

531 ) 

532 else: 

533 return compat.inspect_getfullargspec(fn) 

534 elif inspect.ismethod(fn): 

535 if no_self and (_is_init or fn.__self__): 

536 spec = compat.inspect_getfullargspec(fn.__func__) 

537 return compat.FullArgSpec( 

538 spec.args[1:], 

539 spec.varargs, 

540 spec.varkw, 

541 spec.defaults, 

542 spec.kwonlyargs, 

543 spec.kwonlydefaults, 

544 spec.annotations, 

545 ) 

546 else: 

547 return compat.inspect_getfullargspec(fn.__func__) 

548 elif inspect.isclass(fn): 

549 return get_callable_argspec( 

550 fn.__init__, no_self=no_self, _is_init=True 

551 ) 

552 elif hasattr(fn, "__func__"): 

553 return compat.inspect_getfullargspec(fn.__func__) 

554 elif hasattr(fn, "__call__"): 

555 if inspect.ismethod(fn.__call__): 

556 return get_callable_argspec(fn.__call__, no_self=no_self) 

557 else: 

558 raise TypeError("Can't inspect callable: %s" % fn) 

559 else: 

560 raise TypeError("Can't inspect callable: %s" % fn) 

561 

562 

563def format_argspec_plus( 

564 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

565) -> Dict[str, Optional[str]]: 

566 """Returns a dictionary of formatted, introspected function arguments. 

567 

568 A enhanced variant of inspect.formatargspec to support code generation. 

569 

570 fn 

571 An inspectable callable or tuple of inspect getargspec() results. 

572 grouped 

573 Defaults to True; include (parens, around, argument) lists 

574 

575 Returns: 

576 

577 args 

578 Full inspect.formatargspec for fn 

579 self_arg 

580 The name of the first positional argument, varargs[0], or None 

581 if the function defines no positional arguments. 

582 apply_pos 

583 args, re-written in calling rather than receiving syntax. Arguments are 

584 passed positionally. 

585 apply_kw 

586 Like apply_pos, except keyword-ish args are passed as keywords. 

587 apply_pos_proxied 

588 Like apply_pos but omits the self/cls argument 

589 

590 Example:: 

591 

592 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

593 {'grouped_args': '(self, a, b, c=3, **d)', 

594 'self_arg': 'self', 

595 'apply_kw': '(self, a, b, c=c, **d)', 

596 'apply_pos': '(self, a, b, c, **d)'} 

597 

598 """ 

599 if callable(fn): 

600 spec = compat.inspect_getfullargspec(fn) 

601 else: 

602 spec = fn 

603 

604 args = compat.inspect_formatargspec(*spec) 

605 

606 apply_pos = compat.inspect_formatargspec( 

607 spec[0], spec[1], spec[2], None, spec[4] 

608 ) 

609 

610 if spec[0]: 

611 self_arg = spec[0][0] 

612 

613 apply_pos_proxied = compat.inspect_formatargspec( 

614 spec[0][1:], spec[1], spec[2], None, spec[4] 

615 ) 

616 

617 elif spec[1]: 

618 # I'm not sure what this is 

619 self_arg = "%s[0]" % spec[1] 

620 

621 apply_pos_proxied = apply_pos 

622 else: 

623 self_arg = None 

624 apply_pos_proxied = apply_pos 

625 

626 num_defaults = 0 

627 if spec[3]: 

628 num_defaults += len(cast(Tuple[Any], spec[3])) 

629 if spec[4]: 

630 num_defaults += len(spec[4]) 

631 

632 name_args = spec[0] + spec[4] 

633 

634 defaulted_vals: Union[List[str], Tuple[()]] 

635 

636 if num_defaults: 

637 defaulted_vals = name_args[0 - num_defaults :] 

638 else: 

639 defaulted_vals = () 

640 

641 apply_kw = compat.inspect_formatargspec( 

642 name_args, 

643 spec[1], 

644 spec[2], 

645 defaulted_vals, 

646 formatvalue=lambda x: "=" + str(x), 

647 ) 

648 

649 if spec[0]: 

650 apply_kw_proxied = compat.inspect_formatargspec( 

651 name_args[1:], 

652 spec[1], 

653 spec[2], 

654 defaulted_vals, 

655 formatvalue=lambda x: "=" + str(x), 

656 ) 

657 else: 

658 apply_kw_proxied = apply_kw 

659 

660 if grouped: 

661 return dict( 

662 grouped_args=args, 

663 self_arg=self_arg, 

664 apply_pos=apply_pos, 

665 apply_kw=apply_kw, 

666 apply_pos_proxied=apply_pos_proxied, 

667 apply_kw_proxied=apply_kw_proxied, 

668 ) 

669 else: 

670 return dict( 

671 grouped_args=args, 

672 self_arg=self_arg, 

673 apply_pos=apply_pos[1:-1], 

674 apply_kw=apply_kw[1:-1], 

675 apply_pos_proxied=apply_pos_proxied[1:-1], 

676 apply_kw_proxied=apply_kw_proxied[1:-1], 

677 ) 

678 

679 

680def format_argspec_init(method, grouped=True): 

681 """format_argspec_plus with considerations for typical __init__ methods 

682 

683 Wraps format_argspec_plus with error handling strategies for typical 

684 __init__ cases: 

685 

686 .. sourcecode:: text 

687 

688 object.__init__ -> (self) 

689 other unreflectable (usually C) -> (self, *args, **kwargs) 

690 

691 """ 

692 if method is object.__init__: 

693 grouped_args = "(self)" 

694 args = "(self)" if grouped else "self" 

695 proxied = "()" if grouped else "" 

696 else: 

697 try: 

698 return format_argspec_plus(method, grouped=grouped) 

699 except TypeError: 

700 grouped_args = "(self, *args, **kwargs)" 

701 args = grouped_args if grouped else "self, *args, **kwargs" 

702 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

703 return dict( 

704 self_arg="self", 

705 grouped_args=grouped_args, 

706 apply_pos=args, 

707 apply_kw=args, 

708 apply_pos_proxied=proxied, 

709 apply_kw_proxied=proxied, 

710 ) 

711 

712 

713def create_proxy_methods( 

714 target_cls: Type[Any], 

715 target_cls_sphinx_name: str, 

716 proxy_cls_sphinx_name: str, 

717 classmethods: Sequence[str] = (), 

718 methods: Sequence[str] = (), 

719 attributes: Sequence[str] = (), 

720 use_intermediate_variable: Sequence[str] = (), 

721) -> Callable[[_T], _T]: 

722 """A class decorator indicating attributes should refer to a proxy 

723 class. 

724 

725 This decorator is now a "marker" that does nothing at runtime. Instead, 

726 it is consumed by the tools/generate_proxy_methods.py script to 

727 statically generate proxy methods and attributes that are fully 

728 recognized by typing tools such as mypy. 

729 

730 """ 

731 

732 def decorate(cls): 

733 return cls 

734 

735 return decorate 

736 

737 

738def getargspec_init(method): 

739 """inspect.getargspec with considerations for typical __init__ methods 

740 

741 Wraps inspect.getargspec with error handling for typical __init__ cases: 

742 

743 .. sourcecode:: text 

744 

745 object.__init__ -> (self) 

746 other unreflectable (usually C) -> (self, *args, **kwargs) 

747 

748 """ 

749 try: 

750 return compat.inspect_getfullargspec(method) 

751 except TypeError: 

752 if method is object.__init__: 

753 return (["self"], None, None, None) 

754 else: 

755 return (["self"], "args", "kwargs", None) 

756 

757 

758def unbound_method_to_callable(func_or_cls): 

759 """Adjust the incoming callable such that a 'self' argument is not 

760 required. 

761 

762 """ 

763 

764 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

765 return func_or_cls.__func__ 

766 else: 

767 return func_or_cls 

768 

769 

770class GenericRepr: 

771 """Encapsulates the logic for creating a generic __repr__() string. 

772 

773 This class allows for the repr structure to be created, then modified 

774 (e.g., changing the class name), before being rendered as a string. 

775 

776 .. versionadded:: 2.1 

777 """ 

778 

779 __slots__ = ( 

780 "_obj", 

781 "_additional_kw", 

782 "_to_inspect", 

783 "_omit_kwarg", 

784 "_class_name", 

785 ) 

786 

787 _obj: Any 

788 _additional_kw: Sequence[Tuple[str, Any]] 

789 _to_inspect: List[object] 

790 _omit_kwarg: Sequence[str] 

791 _class_name: Optional[str] 

792 

793 def __init__( 

794 self, 

795 obj: Any, 

796 additional_kw: Sequence[Tuple[str, Any]] = (), 

797 to_inspect: Optional[Union[object, List[object]]] = None, 

798 omit_kwarg: Sequence[str] = (), 

799 ): 

800 """Create a GenericRepr object. 

801 

802 :param obj: The object being repr'd 

803 :param additional_kw: Additional keyword arguments to check for in 

804 the repr, as a sequence of 2-tuples of (name, default_value) 

805 :param to_inspect: One or more objects whose __init__ signature 

806 should be inspected. If not provided, defaults to [obj]. 

807 :param omit_kwarg: Sequence of keyword argument names to omit from 

808 the repr output 

809 """ 

810 self._obj = obj 

811 self._additional_kw = additional_kw 

812 self._to_inspect = ( 

813 [obj] if to_inspect is None else _collections.to_list(to_inspect) 

814 ) 

815 self._omit_kwarg = omit_kwarg 

816 self._class_name = None 

817 

818 def set_class_name(self, class_name: str) -> GenericRepr: 

819 """Set the class name to be used in the repr. 

820 

821 By default, the class name is taken from obj.__class__.__name__. 

822 This method allows it to be overridden. 

823 

824 :param class_name: The class name to use 

825 :return: self, for method chaining 

826 """ 

827 self._class_name = class_name 

828 return self 

829 

830 def __str__(self) -> str: 

831 """Produce the __repr__() string based on the configured parameters.""" 

832 obj = self._obj 

833 to_inspect = self._to_inspect 

834 additional_kw = self._additional_kw 

835 omit_kwarg = self._omit_kwarg 

836 

837 missing = object() 

838 

839 pos_args = [] 

840 kw_args: _collections.OrderedDict[str, Any] = ( 

841 _collections.OrderedDict() 

842 ) 

843 vargs = None 

844 for i, insp in enumerate(to_inspect): 

845 try: 

846 spec = compat.inspect_getfullargspec(insp.__init__) # type: ignore[misc] # noqa: E501 

847 except TypeError: 

848 continue 

849 else: 

850 default_len = len(spec.defaults) if spec.defaults else 0 

851 if i == 0: 

852 if spec.varargs: 

853 vargs = spec.varargs 

854 if default_len: 

855 pos_args.extend(spec.args[1:-default_len]) 

856 else: 

857 pos_args.extend(spec.args[1:]) 

858 else: 

859 kw_args.update( 

860 [(arg, missing) for arg in spec.args[1:-default_len]] 

861 ) 

862 

863 if default_len: 

864 assert spec.defaults 

865 kw_args.update( 

866 [ 

867 (arg, default) 

868 for arg, default in zip( 

869 spec.args[-default_len:], spec.defaults 

870 ) 

871 ] 

872 ) 

873 output: List[str] = [] 

874 

875 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

876 

877 if vargs is not None and hasattr(obj, vargs): 

878 output.extend([repr(val) for val in getattr(obj, vargs)]) 

879 

880 for arg, defval in kw_args.items(): 

881 if arg in omit_kwarg: 

882 continue 

883 try: 

884 val = getattr(obj, arg, missing) 

885 if val is not missing and val != defval: 

886 output.append("%s=%r" % (arg, val)) 

887 except Exception: 

888 pass 

889 

890 if additional_kw: 

891 for arg, defval in additional_kw: 

892 try: 

893 val = getattr(obj, arg, missing) 

894 if val is not missing and val != defval: 

895 output.append("%s=%r" % (arg, val)) 

896 except Exception: 

897 pass 

898 

899 class_name = ( 

900 self._class_name 

901 if self._class_name is not None 

902 else obj.__class__.__name__ 

903 ) 

904 return "%s(%s)" % (class_name, ", ".join(output)) 

905 

906 

907def generic_repr( 

908 obj: Any, 

909 additional_kw: Sequence[Tuple[str, Any]] = (), 

910 to_inspect: Optional[Union[object, List[object]]] = None, 

911 omit_kwarg: Sequence[str] = (), 

912) -> str: 

913 """Produce a __repr__() based on direct association of the __init__() 

914 specification vs. same-named attributes present. 

915 

916 """ 

917 return str( 

918 GenericRepr( 

919 obj, 

920 additional_kw=additional_kw, 

921 to_inspect=to_inspect, 

922 omit_kwarg=omit_kwarg, 

923 ) 

924 ) 

925 

926 

927def class_hierarchy(cls): 

928 """Return an unordered sequence of all classes related to cls. 

929 

930 Traverses diamond hierarchies. 

931 

932 Fibs slightly: subclasses of builtin types are not returned. Thus 

933 class_hierarchy(class A(object)) returns (A, object), not A plus every 

934 class systemwide that derives from object. 

935 

936 """ 

937 

938 hier = {cls} 

939 process = list(cls.__mro__) 

940 while process: 

941 c = process.pop() 

942 bases = (_ for _ in c.__bases__ if _ not in hier) 

943 

944 for b in bases: 

945 process.append(b) 

946 hier.add(b) 

947 

948 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

949 continue 

950 

951 for s in [ 

952 _ 

953 for _ in ( 

954 c.__subclasses__() 

955 if not issubclass(c, type) 

956 else c.__subclasses__(c) 

957 ) 

958 if _ not in hier 

959 ]: 

960 process.append(s) 

961 hier.add(s) 

962 return list(hier) 

963 

964 

965def iterate_attributes(cls): 

966 """iterate all the keys and attributes associated 

967 with a class, without using getattr(). 

968 

969 Does not use getattr() so that class-sensitive 

970 descriptors (i.e. property.__get__()) are not called. 

971 

972 """ 

973 keys = dir(cls) 

974 for key in keys: 

975 for c in cls.__mro__: 

976 if key in c.__dict__: 

977 yield (key, c.__dict__[key]) 

978 break 

979 

980 

981def monkeypatch_proxied_specials( 

982 into_cls, 

983 from_cls, 

984 skip=None, 

985 only=None, 

986 name="self.proxy", 

987 from_instance=None, 

988): 

989 """Automates delegation of __specials__ for a proxying type.""" 

990 

991 if only: 

992 dunders = only 

993 else: 

994 if skip is None: 

995 skip = ( 

996 "__slots__", 

997 "__del__", 

998 "__getattribute__", 

999 "__metaclass__", 

1000 "__getstate__", 

1001 "__setstate__", 

1002 ) 

1003 dunders = [ 

1004 m 

1005 for m in dir(from_cls) 

1006 if ( 

1007 m.startswith("__") 

1008 and m.endswith("__") 

1009 and not hasattr(into_cls, m) 

1010 and m not in skip 

1011 ) 

1012 ] 

1013 

1014 for method in dunders: 

1015 try: 

1016 maybe_fn = getattr(from_cls, method) 

1017 if not hasattr(maybe_fn, "__call__"): 

1018 continue 

1019 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

1020 fn = cast(types.FunctionType, maybe_fn) 

1021 

1022 except AttributeError: 

1023 continue 

1024 try: 

1025 spec = compat.inspect_getfullargspec(fn) 

1026 fn_args = compat.inspect_formatargspec(spec[0]) 

1027 d_args = compat.inspect_formatargspec(spec[0][1:]) 

1028 except TypeError: 

1029 fn_args = "(self, *args, **kw)" 

1030 d_args = "(*args, **kw)" 

1031 

1032 py = ( 

1033 "def %(method)s%(fn_args)s: " 

1034 "return %(name)s.%(method)s%(d_args)s" % locals() 

1035 ) 

1036 

1037 env: Dict[str, types.FunctionType] = ( 

1038 from_instance is not None and {name: from_instance} or {} 

1039 ) 

1040 exec(py, env) 

1041 try: 

1042 env[method].__defaults__ = fn.__defaults__ 

1043 except AttributeError: 

1044 pass 

1045 setattr(into_cls, method, env[method]) 

1046 

1047 

1048def methods_equivalent(meth1, meth2): 

1049 """Return True if the two methods are the same implementation.""" 

1050 

1051 return getattr(meth1, "__func__", meth1) is getattr( 

1052 meth2, "__func__", meth2 

1053 ) 

1054 

1055 

1056def as_interface(obj, cls=None, methods=None, required=None): 

1057 """Ensure basic interface compliance for an instance or dict of callables. 

1058 

1059 Checks that ``obj`` implements public methods of ``cls`` or has members 

1060 listed in ``methods``. If ``required`` is not supplied, implementing at 

1061 least one interface method is sufficient. Methods present on ``obj`` that 

1062 are not in the interface are ignored. 

1063 

1064 If ``obj`` is a dict and ``dict`` does not meet the interface 

1065 requirements, the keys of the dictionary are inspected. Keys present in 

1066 ``obj`` that are not in the interface will raise TypeErrors. 

1067 

1068 Raises TypeError if ``obj`` does not meet the interface criteria. 

1069 

1070 In all passing cases, an object with callable members is returned. In the 

1071 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1072 an anonymous class is returned. 

1073 

1074 obj 

1075 A type, instance, or dictionary of callables. 

1076 cls 

1077 Optional, a type. All public methods of cls are considered the 

1078 interface. An ``obj`` instance of cls will always pass, ignoring 

1079 ``required``.. 

1080 methods 

1081 Optional, a sequence of method names to consider as the interface. 

1082 required 

1083 Optional, a sequence of mandatory implementations. If omitted, an 

1084 ``obj`` that provides at least one interface method is considered 

1085 sufficient. As a convenience, required may be a type, in which case 

1086 all public methods of the type are required. 

1087 

1088 """ 

1089 if not cls and not methods: 

1090 raise TypeError("a class or collection of method names are required") 

1091 

1092 if isinstance(cls, type) and isinstance(obj, cls): 

1093 return obj 

1094 

1095 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1096 implemented = set(dir(obj)) 

1097 

1098 complies = operator.ge 

1099 if isinstance(required, type): 

1100 required = interface 

1101 elif not required: 

1102 required = set() 

1103 complies = operator.gt 

1104 else: 

1105 required = set(required) 

1106 

1107 if complies(implemented.intersection(interface), required): 

1108 return obj 

1109 

1110 # No dict duck typing here. 

1111 if not isinstance(obj, dict): 

1112 qualifier = complies is operator.gt and "any of" or "all of" 

1113 raise TypeError( 

1114 "%r does not implement %s: %s" 

1115 % (obj, qualifier, ", ".join(interface)) 

1116 ) 

1117 

1118 class AnonymousInterface: 

1119 """A callable-holding shell.""" 

1120 

1121 if cls: 

1122 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1123 found = set() 

1124 

1125 for method, impl in dictlike_iteritems(obj): 

1126 if method not in interface: 

1127 raise TypeError("%r: unknown in this interface" % method) 

1128 if not callable(impl): 

1129 raise TypeError("%r=%r is not callable" % (method, impl)) 

1130 setattr(AnonymousInterface, method, staticmethod(impl)) 

1131 found.add(method) 

1132 

1133 if complies(found, required): 

1134 return AnonymousInterface 

1135 

1136 raise TypeError( 

1137 "dictionary does not contain required keys %s" 

1138 % ", ".join(required - found) 

1139 ) 

1140 

1141 

1142_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1143 

1144 

1145class generic_fn_descriptor(Generic[_T_co]): 

1146 """Descriptor which proxies a function when the attribute is not 

1147 present in dict 

1148 

1149 This superclass is organized in a particular way with "memoized" and 

1150 "non-memoized" implementation classes that are hidden from type checkers, 

1151 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1152 classes used for the same attribute. 

1153 

1154 """ 

1155 

1156 fget: Callable[..., _T_co] 

1157 __doc__: Optional[str] 

1158 __name__: str 

1159 

1160 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1161 self.fget = fget 

1162 self.__doc__ = doc or fget.__doc__ 

1163 self.__name__ = fget.__name__ 

1164 

1165 @overload 

1166 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1167 

1168 @overload 

1169 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1170 

1171 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1172 raise NotImplementedError() 

1173 

1174 if TYPE_CHECKING: 

1175 

1176 def __set__(self, instance: Any, value: Any) -> None: ... 

1177 

1178 def __delete__(self, instance: Any) -> None: ... 

1179 

1180 def _reset(self, obj: Any) -> None: 

1181 raise NotImplementedError() 

1182 

1183 @classmethod 

1184 def reset(cls, obj: Any, name: str) -> None: 

1185 raise NotImplementedError() 

1186 

1187 

1188class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1189 """a plain descriptor that proxies a function. 

1190 

1191 primary rationale is to provide a plain attribute that's 

1192 compatible with memoized_property which is also recognized as equivalent 

1193 by mypy. 

1194 

1195 """ 

1196 

1197 if not TYPE_CHECKING: 

1198 

1199 def __get__(self, obj, cls): 

1200 if obj is None: 

1201 return self 

1202 return self.fget(obj) 

1203 

1204 

1205class _memoized_property(generic_fn_descriptor[_T_co]): 

1206 """A read-only @property that is only evaluated once.""" 

1207 

1208 if not TYPE_CHECKING: 

1209 

1210 def __get__(self, obj, cls): 

1211 if obj is None: 

1212 return self 

1213 obj.__dict__[self.__name__] = result = self.fget(obj) 

1214 return result 

1215 

1216 def _reset(self, obj): 

1217 _memoized_property.reset(obj, self.__name__) 

1218 

1219 @classmethod 

1220 def reset(cls, obj, name): 

1221 obj.__dict__.pop(name, None) 

1222 

1223 

1224# despite many attempts to get Mypy to recognize an overridden descriptor 

1225# where one is memoized and the other isn't, there seems to be no reliable 

1226# way other than completely deceiving the type checker into thinking there 

1227# is just one single descriptor type everywhere. Otherwise, if a superclass 

1228# has non-memoized and subclass has memoized, that requires 

1229# "class memoized(non_memoized)". but then if a superclass has memoized and 

1230# superclass has non-memoized, the class hierarchy of the descriptors 

1231# would need to be reversed; "class non_memoized(memoized)". so there's no 

1232# way to achieve this. 

1233# additional issues, RO properties: 

1234# https://github.com/python/mypy/issues/12440 

1235if TYPE_CHECKING: 

1236 # allow memoized and non-memoized to be freely mixed by having them 

1237 # be the same class 

1238 memoized_property = generic_fn_descriptor 

1239 non_memoized_property = generic_fn_descriptor 

1240 

1241 # for read only situations, mypy only sees @property as read only. 

1242 # read only is needed when a subtype specializes the return type 

1243 # of a property, meaning assignment needs to be disallowed 

1244 ro_memoized_property = property 

1245 ro_non_memoized_property = property 

1246 

1247else: 

1248 memoized_property = ro_memoized_property = _memoized_property 

1249 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1250 

1251 

1252def memoized_instancemethod(fn: _F) -> _F: 

1253 """Decorate a method memoize its return value. 

1254 

1255 Best applied to no-arg methods: memoization is not sensitive to 

1256 argument values, and will always return the same value even when 

1257 called with different arguments. 

1258 

1259 """ 

1260 

1261 def oneshot(self, *args, **kw): 

1262 result = fn(self, *args, **kw) 

1263 

1264 def memo(*a, **kw): 

1265 return result 

1266 

1267 memo.__name__ = fn.__name__ 

1268 memo.__doc__ = fn.__doc__ 

1269 self.__dict__[fn.__name__] = memo 

1270 return result 

1271 

1272 return update_wrapper(oneshot, fn) # type: ignore 

1273 

1274 

1275class HasMemoized: 

1276 """A mixin class that maintains the names of memoized elements in a 

1277 collection for easy cache clearing, generative, etc. 

1278 

1279 """ 

1280 

1281 if not TYPE_CHECKING: 

1282 # support classes that want to have __slots__ with an explicit 

1283 # slot for __dict__. not sure if that requires base __slots__ here. 

1284 __slots__ = () 

1285 

1286 _memoized_keys: FrozenSet[str] = frozenset() 

1287 

1288 def _reset_memoizations(self) -> None: 

1289 for elem in self._memoized_keys: 

1290 self.__dict__.pop(elem, None) 

1291 

1292 def _assert_no_memoizations(self) -> None: 

1293 for elem in self._memoized_keys: 

1294 assert elem not in self.__dict__ 

1295 

1296 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1297 self.__dict__[key] = value 

1298 self._memoized_keys |= {key} 

1299 

1300 class memoized_attribute(memoized_property[_T]): 

1301 """A read-only @property that is only evaluated once. 

1302 

1303 :meta private: 

1304 

1305 """ 

1306 

1307 fget: Callable[..., _T] 

1308 __doc__: Optional[str] 

1309 __name__: str 

1310 

1311 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1312 self.fget = fget 

1313 self.__doc__ = doc or fget.__doc__ 

1314 self.__name__ = fget.__name__ 

1315 

1316 @overload 

1317 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1318 

1319 @overload 

1320 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1321 

1322 def __get__(self, obj, cls): 

1323 if obj is None: 

1324 return self 

1325 obj.__dict__[self.__name__] = result = self.fget(obj) 

1326 obj._memoized_keys |= {self.__name__} 

1327 return result 

1328 

1329 @classmethod 

1330 def memoized_instancemethod(cls, fn: _F) -> _F: 

1331 """Decorate a method memoize its return value. 

1332 

1333 :meta private: 

1334 

1335 """ 

1336 

1337 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1338 result = fn(self, *args, **kw) 

1339 

1340 def memo(*a, **kw): 

1341 return result 

1342 

1343 memo.__name__ = fn.__name__ 

1344 memo.__doc__ = fn.__doc__ 

1345 self.__dict__[fn.__name__] = memo 

1346 self._memoized_keys |= {fn.__name__} 

1347 return result 

1348 

1349 return update_wrapper(oneshot, fn) # type: ignore 

1350 

1351 

1352if TYPE_CHECKING: 

1353 HasMemoized_ro_memoized_attribute = property 

1354else: 

1355 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1356 

1357 

1358class MemoizedSlots: 

1359 """Apply memoized items to an object using a __getattr__ scheme. 

1360 

1361 This allows the functionality of memoized_property and 

1362 memoized_instancemethod to be available to a class using __slots__. 

1363 

1364 The memoized get is not threadsafe under freethreading and the 

1365 creator method may in extremely rare cases be called more than once. 

1366 

1367 """ 

1368 

1369 __slots__ = () 

1370 

1371 def _fallback_getattr(self, key): 

1372 raise AttributeError(key) 

1373 

1374 def __getattr__(self, key: str) -> Any: 

1375 if key.startswith("_memoized_attr_") or key.startswith( 

1376 "_memoized_method_" 

1377 ): 

1378 raise AttributeError(key) 

1379 # to avoid recursion errors when interacting with other __getattr__ 

1380 # schemes that refer to this one, when testing for memoized method 

1381 # look at __class__ only rather than going into __getattr__ again. 

1382 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1383 value = getattr(self, f"_memoized_attr_{key}")() 

1384 setattr(self, key, value) 

1385 return value 

1386 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1387 meth = getattr(self, f"_memoized_method_{key}") 

1388 

1389 def oneshot(*args, **kw): 

1390 result = meth(*args, **kw) 

1391 

1392 def memo(*a, **kw): 

1393 return result 

1394 

1395 memo.__name__ = meth.__name__ 

1396 memo.__doc__ = meth.__doc__ 

1397 setattr(self, key, memo) 

1398 return result 

1399 

1400 oneshot.__doc__ = meth.__doc__ 

1401 return oneshot 

1402 else: 

1403 return self._fallback_getattr(key) 

1404 

1405 

1406# from paste.deploy.converters 

1407def asbool(obj: Any) -> bool: 

1408 if isinstance(obj, str): 

1409 obj = obj.strip().lower() 

1410 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1411 return True 

1412 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1413 return False 

1414 else: 

1415 raise ValueError("String is not true/false: %r" % obj) 

1416 return bool(obj) 

1417 

1418 

1419def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1420 """Return a callable that will evaluate a string as 

1421 boolean, or one of a set of "alternate" string values. 

1422 

1423 """ 

1424 

1425 def bool_or_value(obj: str) -> Union[str, bool]: 

1426 if obj in text: 

1427 return obj 

1428 else: 

1429 return asbool(obj) 

1430 

1431 return bool_or_value 

1432 

1433 

1434def asint(value: Any) -> Optional[int]: 

1435 """Coerce to integer.""" 

1436 

1437 if value is None: 

1438 return value 

1439 return int(value) 

1440 

1441 

1442def coerce_kw_type( 

1443 kw: Dict[str, Any], 

1444 key: str, 

1445 type_: Type[Any], 

1446 flexi_bool: bool = True, 

1447 dest: Optional[Dict[str, Any]] = None, 

1448) -> None: 

1449 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1450 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1451 when coercing to boolean. 

1452 """ 

1453 

1454 if dest is None: 

1455 dest = kw 

1456 

1457 if ( 

1458 key in kw 

1459 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1460 and kw[key] is not None 

1461 ): 

1462 if type_ is bool and flexi_bool: 

1463 dest[key] = asbool(kw[key]) 

1464 else: 

1465 dest[key] = type_(kw[key]) 

1466 

1467 

1468def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1469 """Produce a tuple structure that is cacheable using the __dict__ of 

1470 obj to retrieve values 

1471 

1472 """ 

1473 names = get_cls_kwargs(cls) 

1474 return (cls,) + tuple( 

1475 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1476 ) 

1477 

1478 

1479def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1480 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1481 

1482 Uses inspect to match the named arguments of ``cls``. 

1483 

1484 """ 

1485 

1486 names = get_cls_kwargs(cls) 

1487 kw.update( 

1488 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1489 ) 

1490 return cls(*args, **kw) 

1491 

1492 

1493def counter() -> Callable[[], int]: 

1494 """Return a threadsafe counter function.""" 

1495 

1496 lock = threading.Lock() 

1497 counter = itertools.count(1) 

1498 

1499 # avoid the 2to3 "next" transformation... 

1500 def _next(): 

1501 with lock: 

1502 return next(counter) 

1503 

1504 return _next 

1505 

1506 

1507def duck_type_collection( 

1508 specimen: Any, default: Optional[Type[Any]] = None 

1509) -> Optional[Type[Any]]: 

1510 """Given an instance or class, guess if it is or is acting as one of 

1511 the basic collection types: list, set and dict. If the __emulates__ 

1512 property is present, return that preferentially. 

1513 """ 

1514 

1515 if hasattr(specimen, "__emulates__"): 

1516 # canonicalize set vs sets.Set to a standard: the builtin set 

1517 if specimen.__emulates__ is not None and issubclass( 

1518 specimen.__emulates__, set 

1519 ): 

1520 return set 

1521 else: 

1522 return specimen.__emulates__ # type: ignore 

1523 

1524 isa = issubclass if isinstance(specimen, type) else isinstance 

1525 if isa(specimen, list): 

1526 return list 

1527 elif isa(specimen, set): 

1528 return set 

1529 elif isa(specimen, dict): 

1530 return dict 

1531 

1532 if hasattr(specimen, "append"): 

1533 return list 

1534 elif hasattr(specimen, "add"): 

1535 return set 

1536 elif hasattr(specimen, "set"): 

1537 return dict 

1538 else: 

1539 return default 

1540 

1541 

1542def assert_arg_type( 

1543 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1544) -> Any: 

1545 if isinstance(arg, argtype): 

1546 return arg 

1547 else: 

1548 if isinstance(argtype, tuple): 

1549 raise exc.ArgumentError( 

1550 "Argument '%s' is expected to be one of type %s, got '%s'" 

1551 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1552 ) 

1553 else: 

1554 raise exc.ArgumentError( 

1555 "Argument '%s' is expected to be of type '%s', got '%s'" 

1556 % (name, argtype, type(arg)) 

1557 ) 

1558 

1559 

1560def dictlike_iteritems(dictlike): 

1561 """Return a (key, value) iterator for almost any dict-like object.""" 

1562 

1563 if hasattr(dictlike, "items"): 

1564 return list(dictlike.items()) 

1565 

1566 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1567 if getter is None: 

1568 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1569 

1570 if hasattr(dictlike, "iterkeys"): 

1571 

1572 def iterator(): 

1573 for key in dictlike.iterkeys(): 

1574 assert getter is not None 

1575 yield key, getter(key) 

1576 

1577 return iterator() 

1578 elif hasattr(dictlike, "keys"): 

1579 return iter((key, getter(key)) for key in dictlike.keys()) 

1580 else: 

1581 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1582 

1583 

1584class classproperty(property): 

1585 """A decorator that behaves like @property except that operates 

1586 on classes rather than instances. 

1587 

1588 The decorator is currently special when using the declarative 

1589 module, but note that the 

1590 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1591 decorator should be used for this purpose with declarative. 

1592 

1593 """ 

1594 

1595 fget: Callable[[Any], Any] 

1596 

1597 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1598 super().__init__(fget, *arg, **kw) 

1599 self.__doc__ = fget.__doc__ 

1600 

1601 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1602 return self.fget(cls) 

1603 

1604 

1605class hybridproperty(Generic[_T]): 

1606 def __init__(self, func: Callable[..., _T]): 

1607 self.func = func 

1608 self.clslevel = func 

1609 

1610 def __get__(self, instance: Any, owner: Any) -> _T: 

1611 if instance is None: 

1612 clsval = self.clslevel(owner) 

1613 return clsval 

1614 else: 

1615 return self.func(instance) 

1616 

1617 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1618 self.clslevel = func 

1619 return self 

1620 

1621 

1622class rw_hybridproperty(Generic[_T]): 

1623 def __init__(self, func: Callable[..., _T]): 

1624 self.func = func 

1625 self.clslevel = func 

1626 self.setfn: Optional[Callable[..., Any]] = None 

1627 

1628 def __get__(self, instance: Any, owner: Any) -> _T: 

1629 if instance is None: 

1630 clsval = self.clslevel(owner) 

1631 return clsval 

1632 else: 

1633 return self.func(instance) 

1634 

1635 def __set__(self, instance: Any, value: Any) -> None: 

1636 assert self.setfn is not None 

1637 self.setfn(instance, value) 

1638 

1639 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1640 self.setfn = func 

1641 return self 

1642 

1643 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1644 self.clslevel = func 

1645 return self 

1646 

1647 

1648class hybridmethod(Generic[_T]): 

1649 """Decorate a function as cls- or instance- level.""" 

1650 

1651 def __init__(self, func: Callable[..., _T]): 

1652 self.func = self.__func__ = func 

1653 self.clslevel = func 

1654 

1655 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1656 if instance is None: 

1657 return self.clslevel.__get__(owner, owner.__class__) # type:ignore 

1658 else: 

1659 return self.func.__get__(instance, owner) # type:ignore 

1660 

1661 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1662 self.clslevel = func 

1663 return self 

1664 

1665 

1666class symbol(int): 

1667 """A constant symbol. 

1668 

1669 >>> symbol("foo") is symbol("foo") 

1670 True 

1671 >>> symbol("foo") 

1672 <symbol 'foo> 

1673 

1674 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1675 advantage of symbol() is its repr(). They are also singletons. 

1676 

1677 Repeated calls of symbol('name') will all return the same instance. 

1678 

1679 """ 

1680 

1681 name: str 

1682 

1683 symbols: Dict[str, symbol] = {} 

1684 _lock = threading.Lock() 

1685 

1686 def __new__( 

1687 cls, 

1688 name: str, 

1689 doc: Optional[str] = None, 

1690 canonical: Optional[int] = None, 

1691 ) -> symbol: 

1692 with cls._lock: 

1693 sym = cls.symbols.get(name) 

1694 if sym is None: 

1695 assert isinstance(name, str) 

1696 if canonical is None: 

1697 canonical = hash(name) 

1698 sym = int.__new__(symbol, canonical) 

1699 sym.name = name 

1700 if doc: 

1701 sym.__doc__ = doc 

1702 

1703 # NOTE: we should ultimately get rid of this global thing, 

1704 # however, currently it is to support pickling. The best 

1705 # change would be when we are on py3.11 at a minimum, we 

1706 # switch to stdlib enum.IntFlag. 

1707 cls.symbols[name] = sym 

1708 else: 

1709 if canonical and canonical != sym: 

1710 raise TypeError( 

1711 f"Can't replace canonical symbol for {name!r} " 

1712 f"with new int value {canonical}" 

1713 ) 

1714 return sym 

1715 

1716 def __reduce__(self): 

1717 return symbol, (self.name, "x", int(self)) 

1718 

1719 def __str__(self): 

1720 return repr(self) 

1721 

1722 def __repr__(self): 

1723 return f"symbol({self.name!r})" 

1724 

1725 

1726class _IntFlagMeta(type): 

1727 def __init__( 

1728 cls, 

1729 classname: str, 

1730 bases: Tuple[Type[Any], ...], 

1731 dict_: Dict[str, Any], 

1732 **kw: Any, 

1733 ) -> None: 

1734 items: List[symbol] 

1735 cls._items = items = [] 

1736 for k, v in dict_.items(): 

1737 if re.match(r"^__.*__$", k): 

1738 continue 

1739 if isinstance(v, int): 

1740 sym = symbol(k, canonical=v) 

1741 elif not k.startswith("_"): 

1742 raise TypeError("Expected integer values for IntFlag") 

1743 else: 

1744 continue 

1745 setattr(cls, k, sym) 

1746 items.append(sym) 

1747 

1748 cls.__members__ = _collections.immutabledict( 

1749 {sym.name: sym for sym in items} 

1750 ) 

1751 

1752 def __iter__(self) -> Iterator[symbol]: 

1753 raise NotImplementedError( 

1754 "iter not implemented to ensure compatibility with " 

1755 "Python 3.11 IntFlag. Please use __members__. See " 

1756 "https://github.com/python/cpython/issues/99304" 

1757 ) 

1758 

1759 

1760class _FastIntFlag(metaclass=_IntFlagMeta): 

1761 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1762 operations. 

1763 

1764 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1765 and ``_FastIntFlag`` otherwise. 

1766 

1767 """ 

1768 

1769 

1770if TYPE_CHECKING: 

1771 from enum import IntFlag 

1772 

1773 FastIntFlag = IntFlag 

1774else: 

1775 FastIntFlag = _FastIntFlag 

1776 

1777 

1778_E = TypeVar("_E", bound=enum.Enum) 

1779 

1780 

1781def parse_user_argument_for_enum( 

1782 arg: Any, 

1783 choices: Dict[_E, List[Any]], 

1784 name: str, 

1785 resolve_symbol_names: bool = False, 

1786) -> Optional[_E]: 

1787 """Given a user parameter, parse the parameter into a chosen value 

1788 from a list of choice objects, typically Enum values. 

1789 

1790 The user argument can be a string name that matches the name of a 

1791 symbol, or the symbol object itself, or any number of alternate choices 

1792 such as True/False/ None etc. 

1793 

1794 :param arg: the user argument. 

1795 :param choices: dictionary of enum values to lists of possible 

1796 entries for each. 

1797 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1798 that is raised if the parameter doesn't match any available argument. 

1799 

1800 """ 

1801 for enum_value, choice in choices.items(): 

1802 if arg is enum_value: 

1803 return enum_value 

1804 elif resolve_symbol_names and arg == enum_value.name: 

1805 return enum_value 

1806 elif arg in choice: 

1807 return enum_value 

1808 

1809 if arg is None: 

1810 return None 

1811 

1812 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1813 

1814 

1815_creation_order = 1 

1816 

1817 

1818def set_creation_order(instance: Any) -> None: 

1819 """Assign a '_creation_order' sequence to the given instance. 

1820 

1821 This allows multiple instances to be sorted in order of creation 

1822 (typically within a single thread; the counter is not particularly 

1823 threadsafe). 

1824 

1825 """ 

1826 global _creation_order 

1827 instance._creation_order = _creation_order 

1828 _creation_order += 1 

1829 

1830 

1831def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1832 """executes the given function, catches all exceptions and converts to 

1833 a warning. 

1834 

1835 """ 

1836 try: 

1837 return func(*args, **kwargs) 

1838 except Exception: 

1839 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1840 

1841 

1842def ellipses_string(value, len_=25): 

1843 try: 

1844 if len(value) > len_: 

1845 return "%s..." % value[0:len_] 

1846 else: 

1847 return value 

1848 except TypeError: 

1849 return value 

1850 

1851 

1852class _hash_limit_string(str): 

1853 """A string subclass that can only be hashed on a maximum amount 

1854 of unique values. 

1855 

1856 This is used for warnings so that we can send out parameterized warnings 

1857 without the __warningregistry__ of the module, or the non-overridable 

1858 "once" registry within warnings.py, overloading memory, 

1859 

1860 

1861 """ 

1862 

1863 _hash: int 

1864 

1865 def __new__( 

1866 cls, value: str, num: int, args: Sequence[Any] 

1867 ) -> _hash_limit_string: 

1868 interpolated = (value % args) + ( 

1869 " (this warning may be suppressed after %d occurrences)" % num 

1870 ) 

1871 self = super().__new__(cls, interpolated) 

1872 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1873 return self 

1874 

1875 def __hash__(self) -> int: 

1876 return self._hash 

1877 

1878 def __eq__(self, other: Any) -> bool: 

1879 return hash(self) == hash(other) 

1880 

1881 

1882def warn(msg: str, code: Optional[str] = None) -> None: 

1883 """Issue a warning. 

1884 

1885 If msg is a string, :class:`.exc.SAWarning` is used as 

1886 the category. 

1887 

1888 """ 

1889 if code: 

1890 _warnings_warn(exc.SAWarning(msg, code=code)) 

1891 else: 

1892 _warnings_warn(msg, exc.SAWarning) 

1893 

1894 

1895def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1896 """Issue a warning with a parameterized string, limiting the number 

1897 of registrations. 

1898 

1899 """ 

1900 if args: 

1901 msg = _hash_limit_string(msg, 10, args) 

1902 _warnings_warn(msg, exc.SAWarning) 

1903 

1904 

1905_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1906 

1907 

1908def tag_method_for_warnings( 

1909 message: str, category: Type[Warning] 

1910) -> Callable[[_F], _F]: 

1911 def go(fn): 

1912 _warning_tags[fn.__code__] = (message, category) 

1913 return fn 

1914 

1915 return go 

1916 

1917 

1918_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1919 

1920 

1921def _warnings_warn( 

1922 message: Union[str, Warning], 

1923 category: Optional[Type[Warning]] = None, 

1924 stacklevel: int = 2, 

1925) -> None: 

1926 

1927 if category is None and isinstance(message, Warning): 

1928 category = type(message) 

1929 

1930 # adjust the given stacklevel to be outside of SQLAlchemy 

1931 try: 

1932 frame = sys._getframe(stacklevel) 

1933 except ValueError: 

1934 # being called from less than 3 (or given) stacklevels, weird, 

1935 # but don't crash 

1936 stacklevel = 0 

1937 except: 

1938 # _getframe() doesn't work, weird interpreter issue, weird, 

1939 # ok, but don't crash 

1940 stacklevel = 0 

1941 else: 

1942 stacklevel_found = warning_tag_found = False 

1943 while frame is not None: 

1944 # using __name__ here requires that we have __name__ in the 

1945 # __globals__ of the decorated string functions we make also. 

1946 # we generate this using {"__name__": fn.__module__} 

1947 if not stacklevel_found and not re.match( 

1948 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1949 ): 

1950 # stop incrementing stack level if an out-of-SQLA line 

1951 # were found. 

1952 stacklevel_found = True 

1953 

1954 # however, for the warning tag thing, we have to keep 

1955 # scanning up the whole traceback 

1956 

1957 if frame.f_code in _warning_tags: 

1958 warning_tag_found = True 

1959 (_suffix, _category) = _warning_tags[frame.f_code] 

1960 category = category or _category 

1961 message = f"{message} ({_suffix})" 

1962 

1963 frame = frame.f_back # type: ignore[assignment] 

1964 

1965 if not stacklevel_found: 

1966 stacklevel += 1 

1967 elif stacklevel_found and warning_tag_found: 

1968 break 

1969 

1970 if category is not None: 

1971 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1972 else: 

1973 warnings.warn(message, stacklevel=stacklevel + 1) 

1974 

1975 

1976def only_once( 

1977 fn: Callable[..., _T], retry_on_exception: bool 

1978) -> Callable[..., Optional[_T]]: 

1979 """Decorate the given function to be a no-op after it is called exactly 

1980 once.""" 

1981 

1982 once = [fn] 

1983 

1984 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1985 # strong reference fn so that it isn't garbage collected, 

1986 # which interferes with the event system's expectations 

1987 strong_fn = fn # noqa 

1988 if once: 

1989 once_fn = once.pop() 

1990 try: 

1991 return once_fn(*arg, **kw) 

1992 except: 

1993 if retry_on_exception: 

1994 once.insert(0, once_fn) 

1995 raise 

1996 

1997 return None 

1998 

1999 return go 

2000 

2001 

2002_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

2003_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

2004 

2005 

2006def chop_traceback( 

2007 tb: List[str], 

2008 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

2009 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

2010) -> List[str]: 

2011 """Chop extraneous lines off beginning and end of a traceback. 

2012 

2013 :param tb: 

2014 a list of traceback lines as returned by ``traceback.format_stack()`` 

2015 

2016 :param exclude_prefix: 

2017 a regular expression object matching lines to skip at beginning of 

2018 ``tb`` 

2019 

2020 :param exclude_suffix: 

2021 a regular expression object matching lines to skip at end of ``tb`` 

2022 """ 

2023 start = 0 

2024 end = len(tb) - 1 

2025 while start <= end and exclude_prefix.search(tb[start]): 

2026 start += 1 

2027 while start <= end and exclude_suffix.search(tb[end]): 

2028 end -= 1 

2029 return tb[start : end + 1] 

2030 

2031 

2032def attrsetter(attrname): 

2033 code = "def set(obj, value): obj.%s = value" % attrname 

2034 env = locals().copy() 

2035 exec(code, env) 

2036 return env["set"] 

2037 

2038 

2039dunders_re = re.compile("^__.+__$") 

2040 

2041 

2042class TypingOnly: 

2043 """A mixin class that marks a class as 'typing only', meaning it has 

2044 absolutely no methods, attributes, or runtime functionality whatsoever. 

2045 

2046 """ 

2047 

2048 __slots__ = () 

2049 

2050 def __init_subclass__(cls, **kw: Any) -> None: 

2051 if TypingOnly in cls.__bases__: 

2052 remaining = { 

2053 name for name in cls.__dict__ if not dunders_re.match(name) 

2054 } 

2055 if remaining: 

2056 raise AssertionError( 

2057 f"Class {cls} directly inherits TypingOnly but has " 

2058 f"additional attributes {remaining}." 

2059 ) 

2060 super().__init_subclass__(**kw) 

2061 

2062 

2063class EnsureKWArg: 

2064 r"""Apply translation of functions to accept \**kw arguments if they 

2065 don't already. 

2066 

2067 Used to ensure cross-compatibility with third party legacy code, for things 

2068 like compiler visit methods that need to accept ``**kw`` arguments, 

2069 but may have been copied from old code that didn't accept them. 

2070 

2071 """ 

2072 

2073 ensure_kwarg: str 

2074 """a regular expression that indicates method names for which the method 

2075 should accept ``**kw`` arguments. 

2076 

2077 The class will scan for methods matching the name template and decorate 

2078 them if necessary to ensure ``**kw`` parameters are accepted. 

2079 

2080 """ 

2081 

2082 def __init_subclass__(cls) -> None: 

2083 fn_reg = cls.ensure_kwarg 

2084 clsdict = cls.__dict__ 

2085 if fn_reg: 

2086 for key in clsdict: 

2087 m = re.match(fn_reg, key) 

2088 if m: 

2089 fn = clsdict[key] 

2090 spec = compat.inspect_getfullargspec(fn) 

2091 if not spec.varkw: 

2092 wrapped = cls._wrap_w_kw(fn) 

2093 setattr(cls, key, wrapped) 

2094 super().__init_subclass__() 

2095 

2096 @classmethod 

2097 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2098 def wrap(*arg: Any, **kw: Any) -> Any: 

2099 return fn(*arg) 

2100 

2101 return update_wrapper(wrap, fn) 

2102 

2103 

2104def wrap_callable(wrapper, fn): 

2105 """Augment functools.update_wrapper() to work with objects with 

2106 a ``__call__()`` method. 

2107 

2108 :param fn: 

2109 object with __call__ method 

2110 

2111 """ 

2112 if hasattr(fn, "__name__"): 

2113 return update_wrapper(wrapper, fn) 

2114 else: 

2115 _f = wrapper 

2116 _f.__name__ = fn.__class__.__name__ 

2117 if hasattr(fn, "__module__"): 

2118 _f.__module__ = fn.__module__ 

2119 

2120 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2121 _f.__doc__ = fn.__call__.__doc__ 

2122 elif fn.__doc__: 

2123 _f.__doc__ = fn.__doc__ 

2124 

2125 return _f 

2126 

2127 

2128def quoted_token_parser(value): 

2129 """Parse a dotted identifier with accommodation for quoted names. 

2130 

2131 Includes support for SQL-style double quotes as a literal character. 

2132 

2133 E.g.:: 

2134 

2135 >>> quoted_token_parser("name") 

2136 ["name"] 

2137 >>> quoted_token_parser("schema.name") 

2138 ["schema", "name"] 

2139 >>> quoted_token_parser('"Schema"."Name"') 

2140 ['Schema', 'Name'] 

2141 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2142 ['Schema', 'Name""Foo'] 

2143 

2144 """ 

2145 

2146 if '"' not in value: 

2147 return value.split(".") 

2148 

2149 # 0 = outside of quotes 

2150 # 1 = inside of quotes 

2151 state = 0 

2152 result: List[List[str]] = [[]] 

2153 idx = 0 

2154 lv = len(value) 

2155 while idx < lv: 

2156 char = value[idx] 

2157 if char == '"': 

2158 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2159 result[-1].append('"') 

2160 idx += 1 

2161 else: 

2162 state ^= 1 

2163 elif char == "." and state == 0: 

2164 result.append([]) 

2165 else: 

2166 result[-1].append(char) 

2167 idx += 1 

2168 

2169 return ["".join(token) for token in result] 

2170 

2171 

2172def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2173 params = _collections.to_list(params) 

2174 

2175 def decorate(fn): 

2176 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2177 if doc: 

2178 doc = inject_param_text(doc, {param: text for param in params}) 

2179 fn.__doc__ = doc 

2180 return fn 

2181 

2182 return decorate 

2183 

2184 

2185def _dedent_docstring(text: str) -> str: 

2186 split_text = text.split("\n", 1) 

2187 if len(split_text) == 1: 

2188 return text 

2189 else: 

2190 firstline, remaining = split_text 

2191 if not firstline.startswith(" "): 

2192 return firstline + "\n" + textwrap.dedent(remaining) 

2193 else: 

2194 return textwrap.dedent(text) 

2195 

2196 

2197def inject_docstring_text( 

2198 given_doctext: Optional[str], injecttext: str, pos: int 

2199) -> str: 

2200 doctext: str = _dedent_docstring(given_doctext or "") 

2201 lines = doctext.split("\n") 

2202 if len(lines) == 1: 

2203 lines.append("") 

2204 injectlines = textwrap.dedent(injecttext).split("\n") 

2205 if injectlines[0]: 

2206 injectlines.insert(0, "") 

2207 

2208 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2209 blanks.insert(0, 0) 

2210 

2211 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2212 

2213 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2214 return "\n".join(lines) 

2215 

2216 

2217_param_reg = re.compile(r"(\s+):param (.+?):") 

2218 

2219 

2220def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2221 doclines = collections.deque(doctext.splitlines()) 

2222 lines = [] 

2223 

2224 # TODO: this is not working for params like ":param case_sensitive=True:" 

2225 

2226 to_inject = None 

2227 while doclines: 

2228 line = doclines.popleft() 

2229 

2230 m = _param_reg.match(line) 

2231 

2232 if to_inject is None: 

2233 if m: 

2234 param = m.group(2).lstrip("*") 

2235 if param in inject_params: 

2236 # default indent to that of :param: plus one 

2237 indent = " " * len(m.group(1)) + " " 

2238 

2239 # but if the next line has text, use that line's 

2240 # indentation 

2241 if doclines: 

2242 m2 = re.match(r"(\s+)\S", doclines[0]) 

2243 if m2: 

2244 indent = " " * len(m2.group(1)) 

2245 

2246 to_inject = indent + inject_params[param] 

2247 elif m: 

2248 lines.extend(["\n", to_inject, "\n"]) 

2249 to_inject = None 

2250 elif not line.rstrip(): 

2251 lines.extend([line, to_inject, "\n"]) 

2252 to_inject = None 

2253 elif line.endswith("::"): 

2254 # TODO: this still won't cover if the code example itself has 

2255 # blank lines in it, need to detect those via indentation. 

2256 lines.extend([line, doclines.popleft()]) 

2257 continue 

2258 lines.append(line) 

2259 

2260 return "\n".join(lines) 

2261 

2262 

2263def repr_tuple_names(names: List[str]) -> Optional[str]: 

2264 """Trims a list of strings from the middle and return a string of up to 

2265 four elements. Strings greater than 11 characters will be truncated""" 

2266 if len(names) == 0: 

2267 return None 

2268 flag = len(names) <= 4 

2269 names = names[0:4] if flag else names[0:3] + names[-1:] 

2270 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2271 if flag: 

2272 return ", ".join(res) 

2273 else: 

2274 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2275 

2276 

2277def has_compiled_ext(raise_=False): 

2278 from ._has_cython import HAS_CYEXTENSION 

2279 

2280 if HAS_CYEXTENSION: 

2281 return True 

2282 elif raise_: 

2283 raise ImportError( 

2284 "cython extensions were expected to be installed, " 

2285 "but are not present" 

2286 ) 

2287 else: 

2288 return False 

2289 

2290 

2291def load_uncompiled_module(module: _M) -> _M: 

2292 """Load the non-compied version of a module that is also 

2293 compiled with cython. 

2294 """ 

2295 full_name = module.__name__ 

2296 assert module.__spec__ 

2297 parent_name = module.__spec__.parent 

2298 assert parent_name 

2299 parent_module = sys.modules[parent_name] 

2300 assert parent_module.__spec__ 

2301 package_path = parent_module.__spec__.origin 

2302 assert package_path and package_path.endswith("__init__.py") 

2303 

2304 name = full_name.split(".")[-1] 

2305 module_path = package_path.replace("__init__.py", f"{name}.py") 

2306 

2307 py_spec = importlib.util.spec_from_file_location(full_name, module_path) 

2308 assert py_spec 

2309 py_module = importlib.util.module_from_spec(py_spec) 

2310 assert py_spec.loader 

2311 py_spec.loader.exec_module(py_module) 

2312 return cast(_M, py_module) 

2313 

2314 

2315class _Missing(enum.Enum): 

2316 Missing = enum.auto() 

2317 

2318 

2319Missing = _Missing.Missing 

2320MissingOr = Union[_T, Literal[_Missing.Missing]]