Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1032 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13 

14from __future__ import annotations 

15 

16import collections 

17import enum 

18from functools import update_wrapper 

19import importlib.util 

20import inspect 

21import itertools 

22import operator 

23import re 

24import sys 

25import textwrap 

26import threading 

27import types 

28from types import CodeType 

29from types import ModuleType 

30from typing import Any 

31from typing import Callable 

32from typing import cast 

33from typing import Dict 

34from typing import FrozenSet 

35from typing import Generic 

36from typing import Iterator 

37from typing import List 

38from typing import Literal 

39from typing import NoReturn 

40from typing import Optional 

41from typing import overload 

42from typing import Sequence 

43from typing import Set 

44from typing import Tuple 

45from typing import Type 

46from typing import TYPE_CHECKING 

47from typing import TypeVar 

48from typing import Union 

49import warnings 

50 

51from . import _collections 

52from . import compat 

53from .. import exc 

54 

55_T = TypeVar("_T") 

56_T_co = TypeVar("_T_co", covariant=True) 

57_F = TypeVar("_F", bound=Callable[..., Any]) 

58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

59_M = TypeVar("_M", bound=ModuleType) 

60 

61 

62def restore_annotations( 

63 cls: type, new_annotations: dict[str, Any] 

64) -> Callable[[], None]: 

65 """apply alternate annotations to a class, with a callable to restore 

66 the pristine state of the former. 

67 This is used strictly to provide dataclasses on a mapped class, where 

68 in some cases where are making dataclass fields based on an attribute 

69 that is actually a python descriptor on a superclass which we called 

70 to get a value. 

71 if dataclasses were to give us a way to achieve this without swapping 

72 __annotations__, that would be much better. 

73 """ 

74 delattr_ = object() 

75 

76 # pep-649 means classes have "__annotate__", and it's a callable. if it's 

77 # there and is None, we're in "legacy future mode", where it's python 3.14 

78 # or higher and "from __future__ import annotations" is set. in "legacy 

79 # future mode" we have to do the same steps we do for older pythons, 

80 # __annotate__ can be ignored 

81 is_pep649 = hasattr(cls, "__annotate__") and cls.__annotate__ is not None 

82 

83 if is_pep649: 

84 memoized = { 

85 "__annotate__": getattr(cls, "__annotate__", delattr_), 

86 } 

87 else: 

88 memoized = { 

89 "__annotations__": getattr(cls, "__annotations__", delattr_) 

90 } 

91 

92 cls.__annotations__ = new_annotations 

93 

94 def restore(): 

95 for k, v in memoized.items(): 

96 if v is delattr_: 

97 delattr(cls, k) 

98 else: 

99 setattr(cls, k, v) 

100 

101 return restore 

102 

103 

104def md5_hex(x: Any) -> str: 

105 x = x.encode("utf-8") 

106 m = compat.md5_not_for_security() 

107 m.update(x) 

108 return cast(str, m.hexdigest()) 

109 

110 

111class safe_reraise: 

112 """Reraise an exception after invoking some 

113 handler code. 

114 

115 Stores the existing exception info before 

116 invoking so that it is maintained across a potential 

117 coroutine context switch. 

118 

119 e.g.:: 

120 

121 try: 

122 sess.commit() 

123 except: 

124 with safe_reraise(): 

125 sess.rollback() 

126 

127 TODO: we should at some point evaluate current behaviors in this regard 

128 based on current greenlet, gevent/eventlet implementations in Python 3, and 

129 also see the degree to which our own asyncio (based on greenlet also) is 

130 impacted by this. .rollback() will cause IO / context switch to occur in 

131 all these scenarios; what happens to the exception context from an 

132 "except:" block if we don't explicitly store it? Original issue was #2703. 

133 

134 """ 

135 

136 __slots__ = ("_exc_info",) 

137 

138 _exc_info: Union[ 

139 None, 

140 Tuple[ 

141 Type[BaseException], 

142 BaseException, 

143 types.TracebackType, 

144 ], 

145 Tuple[None, None, None], 

146 ] 

147 

148 def __enter__(self) -> None: 

149 self._exc_info = sys.exc_info() 

150 

151 def __exit__( 

152 self, 

153 type_: Optional[Type[BaseException]], 

154 value: Optional[BaseException], 

155 traceback: Optional[types.TracebackType], 

156 ) -> NoReturn: 

157 assert self._exc_info is not None 

158 # see #2703 for notes 

159 if type_ is None: 

160 exc_type, exc_value, exc_tb = self._exc_info 

161 assert exc_value is not None 

162 self._exc_info = None # remove potential circular references 

163 raise exc_value.with_traceback(exc_tb) 

164 else: 

165 self._exc_info = None # remove potential circular references 

166 assert value is not None 

167 raise value.with_traceback(traceback) 

168 

169 

170def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

171 seen: Set[Any] = set() 

172 

173 stack = [cls] 

174 while stack: 

175 cls = stack.pop() 

176 if cls in seen: 

177 continue 

178 else: 

179 seen.add(cls) 

180 stack.extend(cls.__subclasses__()) 

181 yield cls 

182 

183 

184def string_or_unprintable(element: Any) -> str: 

185 if isinstance(element, str): 

186 return element 

187 else: 

188 try: 

189 return str(element) 

190 except Exception: 

191 return "unprintable element %r" % element 

192 

193 

194def clsname_as_plain_name( 

195 cls: Type[Any], use_name: Optional[str] = None 

196) -> str: 

197 name = use_name or cls.__name__ 

198 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

199 

200 

201def method_is_overridden( 

202 instance_or_cls: Union[Type[Any], object], 

203 against_method: Callable[..., Any], 

204) -> bool: 

205 """Return True if the two class methods don't match.""" 

206 

207 if not isinstance(instance_or_cls, type): 

208 current_cls = instance_or_cls.__class__ 

209 else: 

210 current_cls = instance_or_cls 

211 

212 method_name = against_method.__name__ 

213 

214 current_method: types.MethodType = getattr(current_cls, method_name) 

215 

216 return current_method != against_method 

217 

218 

219def decode_slice(slc: slice) -> Tuple[Any, ...]: 

220 """decode a slice object as sent to __getitem__. 

221 

222 takes into account the 2.5 __index__() method, basically. 

223 

224 """ 

225 ret: List[Any] = [] 

226 for x in slc.start, slc.stop, slc.step: 

227 if hasattr(x, "__index__"): 

228 x = x.__index__() 

229 ret.append(x) 

230 return tuple(ret) 

231 

232 

233def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

234 used_set = set(used) 

235 for base in bases: 

236 pool = itertools.chain( 

237 (base,), 

238 map(lambda i: base + str(i), range(1000)), 

239 ) 

240 for sym in pool: 

241 if sym not in used_set: 

242 used_set.add(sym) 

243 yield sym 

244 break 

245 else: 

246 raise NameError("exhausted namespace for symbol base %s" % base) 

247 

248 

249def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

250 """Call the given function given each nonzero bit from n.""" 

251 

252 while n: 

253 b = n & (~n + 1) 

254 yield fn(b) 

255 n ^= b 

256 

257 

258_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

259 

260# this seems to be in flux in recent mypy versions 

261 

262 

263def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

264 """A signature-matching decorator factory.""" 

265 

266 def decorate(fn: _Fn) -> _Fn: 

267 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

268 raise Exception("not a decoratable function") 

269 

270 # Python 3.14 defer creating __annotations__ until its used. 

271 # We do not want to create __annotations__ now. 

272 annofunc = getattr(fn, "__annotate__", None) 

273 if annofunc is not None: 

274 fn.__annotate__ = None # type: ignore[union-attr] 

275 try: 

276 spec = compat.inspect_getfullargspec(fn) 

277 finally: 

278 fn.__annotate__ = annofunc # type: ignore[union-attr] 

279 else: 

280 spec = compat.inspect_getfullargspec(fn) 

281 

282 # Do not generate code for annotations. 

283 # update_wrapper() copies the annotation from fn to decorated. 

284 # We use dummy defaults for code generation to avoid having 

285 # copy of large globals for compiling. 

286 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

287 empty_defaults = (None,) * len(spec.defaults or ()) 

288 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

289 spec = spec._replace( 

290 annotations={}, 

291 defaults=empty_defaults, 

292 kwonlydefaults=empty_kwdefaults, 

293 ) 

294 

295 names = ( 

296 tuple(cast("Tuple[str, ...]", spec[0])) 

297 + cast("Tuple[str, ...]", spec[1:3]) 

298 + (fn.__name__,) 

299 ) 

300 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

301 

302 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

303 metadata.update(format_argspec_plus(spec, grouped=False)) 

304 metadata["name"] = fn.__name__ 

305 

306 if inspect.iscoroutinefunction(fn): 

307 metadata["prefix"] = "async " 

308 metadata["target_prefix"] = "await " 

309 else: 

310 metadata["prefix"] = "" 

311 metadata["target_prefix"] = "" 

312 

313 # look for __ positional arguments. This is a convention in 

314 # SQLAlchemy that arguments should be passed positionally 

315 # rather than as keyword 

316 # arguments. note that apply_pos doesn't currently work in all cases 

317 # such as when a kw-only indicator "*" is present, which is why 

318 # we limit the use of this to just that case we can detect. As we add 

319 # more kinds of methods that use @decorator, things may have to 

320 # be further improved in this area 

321 if "__" in repr(spec[0]): 

322 code = """\ 

323%(prefix)sdef %(name)s%(grouped_args)s: 

324 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

325""" % metadata 

326 else: 

327 code = """\ 

328%(prefix)sdef %(name)s%(grouped_args)s: 

329 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

330""" % metadata 

331 

332 env: Dict[str, Any] = { 

333 targ_name: target, 

334 fn_name: fn, 

335 "__name__": fn.__module__, 

336 } 

337 

338 decorated = cast( 

339 types.FunctionType, 

340 _exec_code_in_env(code, env, fn.__name__), 

341 ) 

342 decorated.__defaults__ = fn.__defaults__ 

343 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

344 return update_wrapper(decorated, fn) # type: ignore[return-value] 

345 

346 return update_wrapper(decorate, target) # type: ignore[return-value] 

347 

348 

349def _exec_code_in_env( 

350 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

351) -> Callable[..., Any]: 

352 exec(code, env) 

353 return env[fn_name] # type: ignore[no-any-return] 

354 

355 

356_PF = TypeVar("_PF") 

357_TE = TypeVar("_TE") 

358 

359 

360class PluginLoader: 

361 def __init__( 

362 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

363 ): 

364 self.group = group 

365 self.impls: Dict[str, Any] = {} 

366 self.auto_fn = auto_fn 

367 

368 def clear(self): 

369 self.impls.clear() 

370 

371 def load(self, name: str) -> Any: 

372 if name in self.impls: 

373 return self.impls[name]() 

374 

375 if self.auto_fn: 

376 loader = self.auto_fn(name) 

377 if loader: 

378 self.impls[name] = loader 

379 return loader() 

380 

381 for impl in compat.importlib_metadata_get(self.group): 

382 if impl.name == name: 

383 self.impls[name] = impl.load 

384 return impl.load() 

385 

386 raise exc.NoSuchModuleError( 

387 "Can't load plugin: %s:%s" % (self.group, name) 

388 ) 

389 

390 def register(self, name: str, modulepath: str, objname: str) -> None: 

391 def load(): 

392 mod = __import__(modulepath) 

393 for token in modulepath.split(".")[1:]: 

394 mod = getattr(mod, token) 

395 return getattr(mod, objname) 

396 

397 self.impls[name] = load 

398 

399 def deregister(self, name: str) -> None: 

400 del self.impls[name] 

401 

402 

403def _inspect_func_args(fn): 

404 try: 

405 co_varkeywords = inspect.CO_VARKEYWORDS 

406 except AttributeError: 

407 # https://docs.python.org/3/library/inspect.html 

408 # The flags are specific to CPython, and may not be defined in other 

409 # Python implementations. Furthermore, the flags are an implementation 

410 # detail, and can be removed or deprecated in future Python releases. 

411 spec = compat.inspect_getfullargspec(fn) 

412 return spec[0], bool(spec[2]) 

413 else: 

414 # use fn.__code__ plus flags to reduce method call overhead 

415 co = fn.__code__ 

416 nargs = co.co_argcount 

417 return ( 

418 list(co.co_varnames[:nargs]), 

419 bool(co.co_flags & co_varkeywords), 

420 ) 

421 

422 

423@overload 

424def get_cls_kwargs( 

425 cls: type, 

426 *, 

427 _set: Optional[Set[str]] = None, 

428 raiseerr: Literal[True] = ..., 

429) -> Set[str]: ... 

430 

431 

432@overload 

433def get_cls_kwargs( 

434 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

435) -> Optional[Set[str]]: ... 

436 

437 

438def get_cls_kwargs( 

439 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

440) -> Optional[Set[str]]: 

441 r"""Return the full set of inherited kwargs for the given `cls`. 

442 

443 Probes a class's __init__ method, collecting all named arguments. If the 

444 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

445 to pass along unrecognized keywords to its base classes, and the 

446 collection process is repeated recursively on each of the bases. 

447 

448 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

449 as this is used within the Core typing system to create copies of type 

450 objects which is a performance-sensitive operation. 

451 

452 No anonymous tuple arguments please ! 

453 

454 """ 

455 toplevel = _set is None 

456 if toplevel: 

457 _set = set() 

458 assert _set is not None 

459 

460 ctr = cls.__dict__.get("__init__", False) 

461 

462 has_init = ( 

463 ctr 

464 and isinstance(ctr, types.FunctionType) 

465 and isinstance(ctr.__code__, types.CodeType) 

466 ) 

467 

468 if has_init: 

469 names, has_kw = _inspect_func_args(ctr) 

470 _set.update(names) 

471 

472 if not has_kw and not toplevel: 

473 if raiseerr: 

474 raise TypeError( 

475 f"given cls {cls} doesn't have an __init__ method" 

476 ) 

477 else: 

478 return None 

479 else: 

480 has_kw = False 

481 

482 if not has_init or has_kw: 

483 for c in cls.__bases__: 

484 if get_cls_kwargs(c, _set=_set) is None: 

485 break 

486 

487 _set.discard("self") 

488 return _set 

489 

490 

491def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

492 """Return the set of legal kwargs for the given `func`. 

493 

494 Uses getargspec so is safe to call for methods, functions, 

495 etc. 

496 

497 """ 

498 

499 return compat.inspect_getfullargspec(func)[0] 

500 

501 

502def get_callable_argspec( 

503 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

504) -> compat.FullArgSpec: 

505 """Return the argument signature for any callable. 

506 

507 All pure-Python callables are accepted, including 

508 functions, methods, classes, objects with __call__; 

509 builtins and other edge cases like functools.partial() objects 

510 raise a TypeError. 

511 

512 """ 

513 if inspect.isbuiltin(fn): 

514 raise TypeError("Can't inspect builtin: %s" % fn) 

515 elif inspect.isfunction(fn): 

516 if _is_init and no_self: 

517 spec = compat.inspect_getfullargspec(fn) 

518 return compat.FullArgSpec( 

519 spec.args[1:], 

520 spec.varargs, 

521 spec.varkw, 

522 spec.defaults, 

523 spec.kwonlyargs, 

524 spec.kwonlydefaults, 

525 spec.annotations, 

526 ) 

527 else: 

528 return compat.inspect_getfullargspec(fn) 

529 elif inspect.ismethod(fn): 

530 if no_self and (_is_init or fn.__self__): 

531 spec = compat.inspect_getfullargspec(fn.__func__) 

532 return compat.FullArgSpec( 

533 spec.args[1:], 

534 spec.varargs, 

535 spec.varkw, 

536 spec.defaults, 

537 spec.kwonlyargs, 

538 spec.kwonlydefaults, 

539 spec.annotations, 

540 ) 

541 else: 

542 return compat.inspect_getfullargspec(fn.__func__) 

543 elif inspect.isclass(fn): 

544 return get_callable_argspec( 

545 fn.__init__, no_self=no_self, _is_init=True 

546 ) 

547 elif hasattr(fn, "__func__"): 

548 return compat.inspect_getfullargspec(fn.__func__) 

549 elif hasattr(fn, "__call__"): 

550 if inspect.ismethod(fn.__call__): 

551 return get_callable_argspec(fn.__call__, no_self=no_self) 

552 else: 

553 raise TypeError("Can't inspect callable: %s" % fn) 

554 else: 

555 raise TypeError("Can't inspect callable: %s" % fn) 

556 

557 

558def format_argspec_plus( 

559 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

560) -> Dict[str, Optional[str]]: 

561 """Returns a dictionary of formatted, introspected function arguments. 

562 

563 A enhanced variant of inspect.formatargspec to support code generation. 

564 

565 fn 

566 An inspectable callable or tuple of inspect getargspec() results. 

567 grouped 

568 Defaults to True; include (parens, around, argument) lists 

569 

570 Returns: 

571 

572 args 

573 Full inspect.formatargspec for fn 

574 self_arg 

575 The name of the first positional argument, varargs[0], or None 

576 if the function defines no positional arguments. 

577 apply_pos 

578 args, re-written in calling rather than receiving syntax. Arguments are 

579 passed positionally. 

580 apply_kw 

581 Like apply_pos, except keyword-ish args are passed as keywords. 

582 apply_pos_proxied 

583 Like apply_pos but omits the self/cls argument 

584 

585 Example:: 

586 

587 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

588 {'grouped_args': '(self, a, b, c=3, **d)', 

589 'self_arg': 'self', 

590 'apply_kw': '(self, a, b, c=c, **d)', 

591 'apply_pos': '(self, a, b, c, **d)'} 

592 

593 """ 

594 if callable(fn): 

595 spec = compat.inspect_getfullargspec(fn) 

596 else: 

597 spec = fn 

598 

599 args = compat.inspect_formatargspec(*spec) 

600 

601 apply_pos = compat.inspect_formatargspec( 

602 spec[0], spec[1], spec[2], None, spec[4] 

603 ) 

604 

605 if spec[0]: 

606 self_arg = spec[0][0] 

607 

608 apply_pos_proxied = compat.inspect_formatargspec( 

609 spec[0][1:], spec[1], spec[2], None, spec[4] 

610 ) 

611 

612 elif spec[1]: 

613 # I'm not sure what this is 

614 self_arg = "%s[0]" % spec[1] 

615 

616 apply_pos_proxied = apply_pos 

617 else: 

618 self_arg = None 

619 apply_pos_proxied = apply_pos 

620 

621 num_defaults = 0 

622 if spec[3]: 

623 num_defaults += len(cast(Tuple[Any], spec[3])) 

624 if spec[4]: 

625 num_defaults += len(spec[4]) 

626 

627 name_args = spec[0] + spec[4] 

628 

629 defaulted_vals: Union[List[str], Tuple[()]] 

630 

631 if num_defaults: 

632 defaulted_vals = name_args[0 - num_defaults :] 

633 else: 

634 defaulted_vals = () 

635 

636 apply_kw = compat.inspect_formatargspec( 

637 name_args, 

638 spec[1], 

639 spec[2], 

640 defaulted_vals, 

641 formatvalue=lambda x: "=" + str(x), 

642 ) 

643 

644 if spec[0]: 

645 apply_kw_proxied = compat.inspect_formatargspec( 

646 name_args[1:], 

647 spec[1], 

648 spec[2], 

649 defaulted_vals, 

650 formatvalue=lambda x: "=" + str(x), 

651 ) 

652 else: 

653 apply_kw_proxied = apply_kw 

654 

655 if grouped: 

656 return dict( 

657 grouped_args=args, 

658 self_arg=self_arg, 

659 apply_pos=apply_pos, 

660 apply_kw=apply_kw, 

661 apply_pos_proxied=apply_pos_proxied, 

662 apply_kw_proxied=apply_kw_proxied, 

663 ) 

664 else: 

665 return dict( 

666 grouped_args=args, 

667 self_arg=self_arg, 

668 apply_pos=apply_pos[1:-1], 

669 apply_kw=apply_kw[1:-1], 

670 apply_pos_proxied=apply_pos_proxied[1:-1], 

671 apply_kw_proxied=apply_kw_proxied[1:-1], 

672 ) 

673 

674 

675def format_argspec_init(method, grouped=True): 

676 """format_argspec_plus with considerations for typical __init__ methods 

677 

678 Wraps format_argspec_plus with error handling strategies for typical 

679 __init__ cases: 

680 

681 .. sourcecode:: text 

682 

683 object.__init__ -> (self) 

684 other unreflectable (usually C) -> (self, *args, **kwargs) 

685 

686 """ 

687 if method is object.__init__: 

688 grouped_args = "(self)" 

689 args = "(self)" if grouped else "self" 

690 proxied = "()" if grouped else "" 

691 else: 

692 try: 

693 return format_argspec_plus(method, grouped=grouped) 

694 except TypeError: 

695 grouped_args = "(self, *args, **kwargs)" 

696 args = grouped_args if grouped else "self, *args, **kwargs" 

697 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

698 return dict( 

699 self_arg="self", 

700 grouped_args=grouped_args, 

701 apply_pos=args, 

702 apply_kw=args, 

703 apply_pos_proxied=proxied, 

704 apply_kw_proxied=proxied, 

705 ) 

706 

707 

708def create_proxy_methods( 

709 target_cls: Type[Any], 

710 target_cls_sphinx_name: str, 

711 proxy_cls_sphinx_name: str, 

712 classmethods: Sequence[str] = (), 

713 methods: Sequence[str] = (), 

714 attributes: Sequence[str] = (), 

715 use_intermediate_variable: Sequence[str] = (), 

716) -> Callable[[_T], _T]: 

717 """A class decorator indicating attributes should refer to a proxy 

718 class. 

719 

720 This decorator is now a "marker" that does nothing at runtime. Instead, 

721 it is consumed by the tools/generate_proxy_methods.py script to 

722 statically generate proxy methods and attributes that are fully 

723 recognized by typing tools such as mypy. 

724 

725 """ 

726 

727 def decorate(cls): 

728 return cls 

729 

730 return decorate 

731 

732 

733def getargspec_init(method): 

734 """inspect.getargspec with considerations for typical __init__ methods 

735 

736 Wraps inspect.getargspec with error handling for typical __init__ cases: 

737 

738 .. sourcecode:: text 

739 

740 object.__init__ -> (self) 

741 other unreflectable (usually C) -> (self, *args, **kwargs) 

742 

743 """ 

744 try: 

745 return compat.inspect_getfullargspec(method) 

746 except TypeError: 

747 if method is object.__init__: 

748 return (["self"], None, None, None) 

749 else: 

750 return (["self"], "args", "kwargs", None) 

751 

752 

753def unbound_method_to_callable(func_or_cls): 

754 """Adjust the incoming callable such that a 'self' argument is not 

755 required. 

756 

757 """ 

758 

759 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

760 return func_or_cls.__func__ 

761 else: 

762 return func_or_cls 

763 

764 

765class GenericRepr: 

766 """Encapsulates the logic for creating a generic __repr__() string. 

767 

768 This class allows for the repr structure to be created, then modified 

769 (e.g., changing the class name), before being rendered as a string. 

770 

771 .. versionadded:: 2.1 

772 """ 

773 

774 __slots__ = ( 

775 "_obj", 

776 "_additional_kw", 

777 "_to_inspect", 

778 "_omit_kwarg", 

779 "_class_name", 

780 ) 

781 

782 _obj: Any 

783 _additional_kw: Sequence[Tuple[str, Any]] 

784 _to_inspect: List[object] 

785 _omit_kwarg: Sequence[str] 

786 _class_name: Optional[str] 

787 

788 def __init__( 

789 self, 

790 obj: Any, 

791 additional_kw: Sequence[Tuple[str, Any]] = (), 

792 to_inspect: Optional[Union[object, List[object]]] = None, 

793 omit_kwarg: Sequence[str] = (), 

794 ): 

795 """Create a GenericRepr object. 

796 

797 :param obj: The object being repr'd 

798 :param additional_kw: Additional keyword arguments to check for in 

799 the repr, as a sequence of 2-tuples of (name, default_value) 

800 :param to_inspect: One or more objects whose __init__ signature 

801 should be inspected. If not provided, defaults to [obj]. 

802 :param omit_kwarg: Sequence of keyword argument names to omit from 

803 the repr output 

804 """ 

805 self._obj = obj 

806 self._additional_kw = additional_kw 

807 self._to_inspect = ( 

808 [obj] if to_inspect is None else _collections.to_list(to_inspect) 

809 ) 

810 self._omit_kwarg = omit_kwarg 

811 self._class_name = None 

812 

813 def set_class_name(self, class_name: str) -> GenericRepr: 

814 """Set the class name to be used in the repr. 

815 

816 By default, the class name is taken from obj.__class__.__name__. 

817 This method allows it to be overridden. 

818 

819 :param class_name: The class name to use 

820 :return: self, for method chaining 

821 """ 

822 self._class_name = class_name 

823 return self 

824 

825 def __str__(self) -> str: 

826 """Produce the __repr__() string based on the configured parameters.""" 

827 obj = self._obj 

828 to_inspect = self._to_inspect 

829 additional_kw = self._additional_kw 

830 omit_kwarg = self._omit_kwarg 

831 

832 missing = object() 

833 

834 pos_args = [] 

835 kw_args: _collections.OrderedDict[str, Any] = ( 

836 _collections.OrderedDict() 

837 ) 

838 vargs = None 

839 for i, insp in enumerate(to_inspect): 

840 try: 

841 spec = compat.inspect_getfullargspec(insp.__init__) # type: ignore[misc] # noqa: E501 

842 except TypeError: 

843 continue 

844 else: 

845 default_len = len(spec.defaults) if spec.defaults else 0 

846 if i == 0: 

847 if spec.varargs: 

848 vargs = spec.varargs 

849 if default_len: 

850 pos_args.extend(spec.args[1:-default_len]) 

851 else: 

852 pos_args.extend(spec.args[1:]) 

853 else: 

854 kw_args.update( 

855 [(arg, missing) for arg in spec.args[1:-default_len]] 

856 ) 

857 

858 if default_len: 

859 assert spec.defaults 

860 kw_args.update( 

861 [ 

862 (arg, default) 

863 for arg, default in zip( 

864 spec.args[-default_len:], spec.defaults 

865 ) 

866 ] 

867 ) 

868 output: List[str] = [] 

869 

870 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

871 

872 if vargs is not None and hasattr(obj, vargs): 

873 output.extend([repr(val) for val in getattr(obj, vargs)]) 

874 

875 for arg, defval in kw_args.items(): 

876 if arg in omit_kwarg: 

877 continue 

878 try: 

879 val = getattr(obj, arg, missing) 

880 if val is not missing and val != defval: 

881 output.append("%s=%r" % (arg, val)) 

882 except Exception: 

883 pass 

884 

885 if additional_kw: 

886 for arg, defval in additional_kw: 

887 try: 

888 val = getattr(obj, arg, missing) 

889 if val is not missing and val != defval: 

890 output.append("%s=%r" % (arg, val)) 

891 except Exception: 

892 pass 

893 

894 class_name = ( 

895 self._class_name 

896 if self._class_name is not None 

897 else obj.__class__.__name__ 

898 ) 

899 return "%s(%s)" % (class_name, ", ".join(output)) 

900 

901 

902def generic_repr( 

903 obj: Any, 

904 additional_kw: Sequence[Tuple[str, Any]] = (), 

905 to_inspect: Optional[Union[object, List[object]]] = None, 

906 omit_kwarg: Sequence[str] = (), 

907) -> str: 

908 """Produce a __repr__() based on direct association of the __init__() 

909 specification vs. same-named attributes present. 

910 

911 """ 

912 return str( 

913 GenericRepr( 

914 obj, 

915 additional_kw=additional_kw, 

916 to_inspect=to_inspect, 

917 omit_kwarg=omit_kwarg, 

918 ) 

919 ) 

920 

921 

922def class_hierarchy(cls): 

923 """Return an unordered sequence of all classes related to cls. 

924 

925 Traverses diamond hierarchies. 

926 

927 Fibs slightly: subclasses of builtin types are not returned. Thus 

928 class_hierarchy(class A(object)) returns (A, object), not A plus every 

929 class systemwide that derives from object. 

930 

931 """ 

932 

933 hier = {cls} 

934 process = list(cls.__mro__) 

935 while process: 

936 c = process.pop() 

937 bases = (_ for _ in c.__bases__ if _ not in hier) 

938 

939 for b in bases: 

940 process.append(b) 

941 hier.add(b) 

942 

943 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

944 continue 

945 

946 for s in [ 

947 _ 

948 for _ in ( 

949 c.__subclasses__() 

950 if not issubclass(c, type) 

951 else c.__subclasses__(c) 

952 ) 

953 if _ not in hier 

954 ]: 

955 process.append(s) 

956 hier.add(s) 

957 return list(hier) 

958 

959 

960def iterate_attributes(cls): 

961 """iterate all the keys and attributes associated 

962 with a class, without using getattr(). 

963 

964 Does not use getattr() so that class-sensitive 

965 descriptors (i.e. property.__get__()) are not called. 

966 

967 """ 

968 keys = dir(cls) 

969 for key in keys: 

970 for c in cls.__mro__: 

971 if key in c.__dict__: 

972 yield (key, c.__dict__[key]) 

973 break 

974 

975 

976def monkeypatch_proxied_specials( 

977 into_cls, 

978 from_cls, 

979 skip=None, 

980 only=None, 

981 name="self.proxy", 

982 from_instance=None, 

983): 

984 """Automates delegation of __specials__ for a proxying type.""" 

985 

986 if only: 

987 dunders = only 

988 else: 

989 if skip is None: 

990 skip = ( 

991 "__slots__", 

992 "__del__", 

993 "__getattribute__", 

994 "__metaclass__", 

995 "__getstate__", 

996 "__setstate__", 

997 ) 

998 dunders = [ 

999 m 

1000 for m in dir(from_cls) 

1001 if ( 

1002 m.startswith("__") 

1003 and m.endswith("__") 

1004 and not hasattr(into_cls, m) 

1005 and m not in skip 

1006 ) 

1007 ] 

1008 

1009 for method in dunders: 

1010 try: 

1011 maybe_fn = getattr(from_cls, method) 

1012 if not hasattr(maybe_fn, "__call__"): 

1013 continue 

1014 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

1015 fn = cast(types.FunctionType, maybe_fn) 

1016 

1017 except AttributeError: 

1018 continue 

1019 try: 

1020 spec = compat.inspect_getfullargspec(fn) 

1021 fn_args = compat.inspect_formatargspec(spec[0]) 

1022 d_args = compat.inspect_formatargspec(spec[0][1:]) 

1023 except TypeError: 

1024 fn_args = "(self, *args, **kw)" 

1025 d_args = "(*args, **kw)" 

1026 

1027 py = ( 

1028 "def %(method)s%(fn_args)s: " 

1029 "return %(name)s.%(method)s%(d_args)s" % locals() 

1030 ) 

1031 

1032 env: Dict[str, types.FunctionType] = ( 

1033 from_instance is not None and {name: from_instance} or {} 

1034 ) 

1035 exec(py, env) 

1036 try: 

1037 env[method].__defaults__ = fn.__defaults__ 

1038 except AttributeError: 

1039 pass 

1040 setattr(into_cls, method, env[method]) 

1041 

1042 

1043def methods_equivalent(meth1, meth2): 

1044 """Return True if the two methods are the same implementation.""" 

1045 

1046 return getattr(meth1, "__func__", meth1) is getattr( 

1047 meth2, "__func__", meth2 

1048 ) 

1049 

1050 

1051def as_interface(obj, cls=None, methods=None, required=None): 

1052 """Ensure basic interface compliance for an instance or dict of callables. 

1053 

1054 Checks that ``obj`` implements public methods of ``cls`` or has members 

1055 listed in ``methods``. If ``required`` is not supplied, implementing at 

1056 least one interface method is sufficient. Methods present on ``obj`` that 

1057 are not in the interface are ignored. 

1058 

1059 If ``obj`` is a dict and ``dict`` does not meet the interface 

1060 requirements, the keys of the dictionary are inspected. Keys present in 

1061 ``obj`` that are not in the interface will raise TypeErrors. 

1062 

1063 Raises TypeError if ``obj`` does not meet the interface criteria. 

1064 

1065 In all passing cases, an object with callable members is returned. In the 

1066 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1067 an anonymous class is returned. 

1068 

1069 obj 

1070 A type, instance, or dictionary of callables. 

1071 cls 

1072 Optional, a type. All public methods of cls are considered the 

1073 interface. An ``obj`` instance of cls will always pass, ignoring 

1074 ``required``.. 

1075 methods 

1076 Optional, a sequence of method names to consider as the interface. 

1077 required 

1078 Optional, a sequence of mandatory implementations. If omitted, an 

1079 ``obj`` that provides at least one interface method is considered 

1080 sufficient. As a convenience, required may be a type, in which case 

1081 all public methods of the type are required. 

1082 

1083 """ 

1084 if not cls and not methods: 

1085 raise TypeError("a class or collection of method names are required") 

1086 

1087 if isinstance(cls, type) and isinstance(obj, cls): 

1088 return obj 

1089 

1090 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1091 implemented = set(dir(obj)) 

1092 

1093 complies = operator.ge 

1094 if isinstance(required, type): 

1095 required = interface 

1096 elif not required: 

1097 required = set() 

1098 complies = operator.gt 

1099 else: 

1100 required = set(required) 

1101 

1102 if complies(implemented.intersection(interface), required): 

1103 return obj 

1104 

1105 # No dict duck typing here. 

1106 if not isinstance(obj, dict): 

1107 qualifier = complies is operator.gt and "any of" or "all of" 

1108 raise TypeError( 

1109 "%r does not implement %s: %s" 

1110 % (obj, qualifier, ", ".join(interface)) 

1111 ) 

1112 

1113 class AnonymousInterface: 

1114 """A callable-holding shell.""" 

1115 

1116 if cls: 

1117 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1118 found = set() 

1119 

1120 for method, impl in dictlike_iteritems(obj): 

1121 if method not in interface: 

1122 raise TypeError("%r: unknown in this interface" % method) 

1123 if not callable(impl): 

1124 raise TypeError("%r=%r is not callable" % (method, impl)) 

1125 setattr(AnonymousInterface, method, staticmethod(impl)) 

1126 found.add(method) 

1127 

1128 if complies(found, required): 

1129 return AnonymousInterface 

1130 

1131 raise TypeError( 

1132 "dictionary does not contain required keys %s" 

1133 % ", ".join(required - found) 

1134 ) 

1135 

1136 

1137_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1138 

1139 

1140class generic_fn_descriptor(Generic[_T_co]): 

1141 """Descriptor which proxies a function when the attribute is not 

1142 present in dict 

1143 

1144 This superclass is organized in a particular way with "memoized" and 

1145 "non-memoized" implementation classes that are hidden from type checkers, 

1146 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1147 classes used for the same attribute. 

1148 

1149 """ 

1150 

1151 fget: Callable[..., _T_co] 

1152 __doc__: Optional[str] 

1153 __name__: str 

1154 

1155 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1156 self.fget = fget 

1157 self.__doc__ = doc or fget.__doc__ 

1158 self.__name__ = fget.__name__ 

1159 

1160 @overload 

1161 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1162 

1163 @overload 

1164 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1165 

1166 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1167 raise NotImplementedError() 

1168 

1169 if TYPE_CHECKING: 

1170 

1171 def __set__(self, instance: Any, value: Any) -> None: ... 

1172 

1173 def __delete__(self, instance: Any) -> None: ... 

1174 

1175 def _reset(self, obj: Any) -> None: 

1176 raise NotImplementedError() 

1177 

1178 @classmethod 

1179 def reset(cls, obj: Any, name: str) -> None: 

1180 raise NotImplementedError() 

1181 

1182 

1183class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1184 """a plain descriptor that proxies a function. 

1185 

1186 primary rationale is to provide a plain attribute that's 

1187 compatible with memoized_property which is also recognized as equivalent 

1188 by mypy. 

1189 

1190 """ 

1191 

1192 if not TYPE_CHECKING: 

1193 

1194 def __get__(self, obj, cls): 

1195 if obj is None: 

1196 return self 

1197 return self.fget(obj) 

1198 

1199 

1200class _memoized_property(generic_fn_descriptor[_T_co]): 

1201 """A read-only @property that is only evaluated once.""" 

1202 

1203 if not TYPE_CHECKING: 

1204 

1205 def __get__(self, obj, cls): 

1206 if obj is None: 

1207 return self 

1208 obj.__dict__[self.__name__] = result = self.fget(obj) 

1209 return result 

1210 

1211 def _reset(self, obj): 

1212 _memoized_property.reset(obj, self.__name__) 

1213 

1214 @classmethod 

1215 def reset(cls, obj, name): 

1216 obj.__dict__.pop(name, None) 

1217 

1218 

1219# despite many attempts to get Mypy to recognize an overridden descriptor 

1220# where one is memoized and the other isn't, there seems to be no reliable 

1221# way other than completely deceiving the type checker into thinking there 

1222# is just one single descriptor type everywhere. Otherwise, if a superclass 

1223# has non-memoized and subclass has memoized, that requires 

1224# "class memoized(non_memoized)". but then if a superclass has memoized and 

1225# superclass has non-memoized, the class hierarchy of the descriptors 

1226# would need to be reversed; "class non_memoized(memoized)". so there's no 

1227# way to achieve this. 

1228# additional issues, RO properties: 

1229# https://github.com/python/mypy/issues/12440 

1230if TYPE_CHECKING: 

1231 # allow memoized and non-memoized to be freely mixed by having them 

1232 # be the same class 

1233 memoized_property = generic_fn_descriptor 

1234 non_memoized_property = generic_fn_descriptor 

1235 

1236 # for read only situations, mypy only sees @property as read only. 

1237 # read only is needed when a subtype specializes the return type 

1238 # of a property, meaning assignment needs to be disallowed 

1239 ro_memoized_property = property 

1240 ro_non_memoized_property = property 

1241 

1242else: 

1243 memoized_property = ro_memoized_property = _memoized_property 

1244 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1245 

1246 

1247def memoized_instancemethod(fn: _F) -> _F: 

1248 """Decorate a method memoize its return value. 

1249 

1250 Best applied to no-arg methods: memoization is not sensitive to 

1251 argument values, and will always return the same value even when 

1252 called with different arguments. 

1253 

1254 """ 

1255 

1256 def oneshot(self, *args, **kw): 

1257 result = fn(self, *args, **kw) 

1258 

1259 def memo(*a, **kw): 

1260 return result 

1261 

1262 memo.__name__ = fn.__name__ 

1263 memo.__doc__ = fn.__doc__ 

1264 self.__dict__[fn.__name__] = memo 

1265 return result 

1266 

1267 return update_wrapper(oneshot, fn) # type: ignore 

1268 

1269 

1270class HasMemoized: 

1271 """A mixin class that maintains the names of memoized elements in a 

1272 collection for easy cache clearing, generative, etc. 

1273 

1274 """ 

1275 

1276 if not TYPE_CHECKING: 

1277 # support classes that want to have __slots__ with an explicit 

1278 # slot for __dict__. not sure if that requires base __slots__ here. 

1279 __slots__ = () 

1280 

1281 _memoized_keys: FrozenSet[str] = frozenset() 

1282 

1283 def _reset_memoizations(self) -> None: 

1284 for elem in self._memoized_keys: 

1285 self.__dict__.pop(elem, None) 

1286 

1287 def _assert_no_memoizations(self) -> None: 

1288 for elem in self._memoized_keys: 

1289 assert elem not in self.__dict__ 

1290 

1291 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1292 self.__dict__[key] = value 

1293 self._memoized_keys |= {key} 

1294 

1295 class memoized_attribute(memoized_property[_T]): 

1296 """A read-only @property that is only evaluated once. 

1297 

1298 :meta private: 

1299 

1300 """ 

1301 

1302 fget: Callable[..., _T] 

1303 __doc__: Optional[str] 

1304 __name__: str 

1305 

1306 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1307 self.fget = fget 

1308 self.__doc__ = doc or fget.__doc__ 

1309 self.__name__ = fget.__name__ 

1310 

1311 @overload 

1312 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1313 

1314 @overload 

1315 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1316 

1317 def __get__(self, obj, cls): 

1318 if obj is None: 

1319 return self 

1320 obj.__dict__[self.__name__] = result = self.fget(obj) 

1321 obj._memoized_keys |= {self.__name__} 

1322 return result 

1323 

1324 @classmethod 

1325 def memoized_instancemethod(cls, fn: _F) -> _F: 

1326 """Decorate a method memoize its return value. 

1327 

1328 :meta private: 

1329 

1330 """ 

1331 

1332 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1333 result = fn(self, *args, **kw) 

1334 

1335 def memo(*a, **kw): 

1336 return result 

1337 

1338 memo.__name__ = fn.__name__ 

1339 memo.__doc__ = fn.__doc__ 

1340 self.__dict__[fn.__name__] = memo 

1341 self._memoized_keys |= {fn.__name__} 

1342 return result 

1343 

1344 return update_wrapper(oneshot, fn) # type: ignore 

1345 

1346 

1347if TYPE_CHECKING: 

1348 HasMemoized_ro_memoized_attribute = property 

1349else: 

1350 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1351 

1352 

1353class MemoizedSlots: 

1354 """Apply memoized items to an object using a __getattr__ scheme. 

1355 

1356 This allows the functionality of memoized_property and 

1357 memoized_instancemethod to be available to a class using __slots__. 

1358 

1359 The memoized get is not threadsafe under freethreading and the 

1360 creator method may in extremely rare cases be called more than once. 

1361 

1362 """ 

1363 

1364 __slots__ = () 

1365 

1366 def _fallback_getattr(self, key): 

1367 raise AttributeError(key) 

1368 

1369 def __getattr__(self, key: str) -> Any: 

1370 if key.startswith("_memoized_attr_") or key.startswith( 

1371 "_memoized_method_" 

1372 ): 

1373 raise AttributeError(key) 

1374 # to avoid recursion errors when interacting with other __getattr__ 

1375 # schemes that refer to this one, when testing for memoized method 

1376 # look at __class__ only rather than going into __getattr__ again. 

1377 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1378 value = getattr(self, f"_memoized_attr_{key}")() 

1379 setattr(self, key, value) 

1380 return value 

1381 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1382 meth = getattr(self, f"_memoized_method_{key}") 

1383 

1384 def oneshot(*args, **kw): 

1385 result = meth(*args, **kw) 

1386 

1387 def memo(*a, **kw): 

1388 return result 

1389 

1390 memo.__name__ = meth.__name__ 

1391 memo.__doc__ = meth.__doc__ 

1392 setattr(self, key, memo) 

1393 return result 

1394 

1395 oneshot.__doc__ = meth.__doc__ 

1396 return oneshot 

1397 else: 

1398 return self._fallback_getattr(key) 

1399 

1400 

1401# from paste.deploy.converters 

1402def asbool(obj: Any) -> bool: 

1403 if isinstance(obj, str): 

1404 obj = obj.strip().lower() 

1405 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1406 return True 

1407 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1408 return False 

1409 else: 

1410 raise ValueError("String is not true/false: %r" % obj) 

1411 return bool(obj) 

1412 

1413 

1414def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1415 """Return a callable that will evaluate a string as 

1416 boolean, or one of a set of "alternate" string values. 

1417 

1418 """ 

1419 

1420 def bool_or_value(obj: str) -> Union[str, bool]: 

1421 if obj in text: 

1422 return obj 

1423 else: 

1424 return asbool(obj) 

1425 

1426 return bool_or_value 

1427 

1428 

1429def asint(value: Any) -> Optional[int]: 

1430 """Coerce to integer.""" 

1431 

1432 if value is None: 

1433 return value 

1434 return int(value) 

1435 

1436 

1437def coerce_kw_type( 

1438 kw: Dict[str, Any], 

1439 key: str, 

1440 type_: Type[Any], 

1441 flexi_bool: bool = True, 

1442 dest: Optional[Dict[str, Any]] = None, 

1443) -> None: 

1444 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1445 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1446 when coercing to boolean. 

1447 """ 

1448 

1449 if dest is None: 

1450 dest = kw 

1451 

1452 if ( 

1453 key in kw 

1454 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1455 and kw[key] is not None 

1456 ): 

1457 if type_ is bool and flexi_bool: 

1458 dest[key] = asbool(kw[key]) 

1459 else: 

1460 dest[key] = type_(kw[key]) 

1461 

1462 

1463def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1464 """Produce a tuple structure that is cacheable using the __dict__ of 

1465 obj to retrieve values 

1466 

1467 """ 

1468 names = get_cls_kwargs(cls) 

1469 return (cls,) + tuple( 

1470 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1471 ) 

1472 

1473 

1474def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1475 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1476 

1477 Uses inspect to match the named arguments of ``cls``. 

1478 

1479 """ 

1480 

1481 names = get_cls_kwargs(cls) 

1482 kw.update( 

1483 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1484 ) 

1485 return cls(*args, **kw) 

1486 

1487 

1488def counter() -> Callable[[], int]: 

1489 """Return a threadsafe counter function.""" 

1490 

1491 lock = threading.Lock() 

1492 counter = itertools.count(1) 

1493 

1494 # avoid the 2to3 "next" transformation... 

1495 def _next(): 

1496 with lock: 

1497 return next(counter) 

1498 

1499 return _next 

1500 

1501 

1502def duck_type_collection( 

1503 specimen: Any, default: Optional[Type[Any]] = None 

1504) -> Optional[Type[Any]]: 

1505 """Given an instance or class, guess if it is or is acting as one of 

1506 the basic collection types: list, set and dict. If the __emulates__ 

1507 property is present, return that preferentially. 

1508 """ 

1509 

1510 if hasattr(specimen, "__emulates__"): 

1511 # canonicalize set vs sets.Set to a standard: the builtin set 

1512 if specimen.__emulates__ is not None and issubclass( 

1513 specimen.__emulates__, set 

1514 ): 

1515 return set 

1516 else: 

1517 return specimen.__emulates__ # type: ignore 

1518 

1519 isa = issubclass if isinstance(specimen, type) else isinstance 

1520 if isa(specimen, list): 

1521 return list 

1522 elif isa(specimen, set): 

1523 return set 

1524 elif isa(specimen, dict): 

1525 return dict 

1526 

1527 if hasattr(specimen, "append"): 

1528 return list 

1529 elif hasattr(specimen, "add"): 

1530 return set 

1531 elif hasattr(specimen, "set"): 

1532 return dict 

1533 else: 

1534 return default 

1535 

1536 

1537def assert_arg_type( 

1538 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1539) -> Any: 

1540 if isinstance(arg, argtype): 

1541 return arg 

1542 else: 

1543 if isinstance(argtype, tuple): 

1544 raise exc.ArgumentError( 

1545 "Argument '%s' is expected to be one of type %s, got '%s'" 

1546 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1547 ) 

1548 else: 

1549 raise exc.ArgumentError( 

1550 "Argument '%s' is expected to be of type '%s', got '%s'" 

1551 % (name, argtype, type(arg)) 

1552 ) 

1553 

1554 

1555def dictlike_iteritems(dictlike): 

1556 """Return a (key, value) iterator for almost any dict-like object.""" 

1557 

1558 if hasattr(dictlike, "items"): 

1559 return list(dictlike.items()) 

1560 

1561 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1562 if getter is None: 

1563 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1564 

1565 if hasattr(dictlike, "iterkeys"): 

1566 

1567 def iterator(): 

1568 for key in dictlike.iterkeys(): 

1569 assert getter is not None 

1570 yield key, getter(key) 

1571 

1572 return iterator() 

1573 elif hasattr(dictlike, "keys"): 

1574 return iter((key, getter(key)) for key in dictlike.keys()) 

1575 else: 

1576 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1577 

1578 

1579class classproperty(property): 

1580 """A decorator that behaves like @property except that operates 

1581 on classes rather than instances. 

1582 

1583 The decorator is currently special when using the declarative 

1584 module, but note that the 

1585 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1586 decorator should be used for this purpose with declarative. 

1587 

1588 """ 

1589 

1590 fget: Callable[[Any], Any] 

1591 

1592 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1593 super().__init__(fget, *arg, **kw) 

1594 self.__doc__ = fget.__doc__ 

1595 

1596 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1597 return self.fget(cls) 

1598 

1599 

1600class hybridproperty(Generic[_T]): 

1601 def __init__(self, func: Callable[..., _T]): 

1602 self.func = func 

1603 self.clslevel = func 

1604 

1605 def __get__(self, instance: Any, owner: Any) -> _T: 

1606 if instance is None: 

1607 clsval = self.clslevel(owner) 

1608 return clsval 

1609 else: 

1610 return self.func(instance) 

1611 

1612 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1613 self.clslevel = func 

1614 return self 

1615 

1616 

1617class rw_hybridproperty(Generic[_T]): 

1618 def __init__(self, func: Callable[..., _T]): 

1619 self.func = func 

1620 self.clslevel = func 

1621 self.setfn: Optional[Callable[..., Any]] = None 

1622 

1623 def __get__(self, instance: Any, owner: Any) -> _T: 

1624 if instance is None: 

1625 clsval = self.clslevel(owner) 

1626 return clsval 

1627 else: 

1628 return self.func(instance) 

1629 

1630 def __set__(self, instance: Any, value: Any) -> None: 

1631 assert self.setfn is not None 

1632 self.setfn(instance, value) 

1633 

1634 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1635 self.setfn = func 

1636 return self 

1637 

1638 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1639 self.clslevel = func 

1640 return self 

1641 

1642 

1643class hybridmethod(Generic[_T]): 

1644 """Decorate a function as cls- or instance- level.""" 

1645 

1646 def __init__(self, func: Callable[..., _T]): 

1647 self.func = self.__func__ = func 

1648 self.clslevel = func 

1649 

1650 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1651 if instance is None: 

1652 return self.clslevel.__get__( # type: ignore[no-any-return] 

1653 owner, owner.__class__ 

1654 ) 

1655 else: 

1656 return self.func.__get__( # type: ignore[no-any-return] 

1657 instance, owner 

1658 ) 

1659 

1660 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1661 self.clslevel = func 

1662 return self 

1663 

1664 

1665class symbol(int): 

1666 """A constant symbol. 

1667 

1668 >>> symbol("foo") is symbol("foo") 

1669 True 

1670 >>> symbol("foo") 

1671 <symbol 'foo> 

1672 

1673 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1674 advantage of symbol() is its repr(). They are also singletons. 

1675 

1676 Repeated calls of symbol('name') will all return the same instance. 

1677 

1678 """ 

1679 

1680 name: str 

1681 

1682 symbols: Dict[str, symbol] = {} 

1683 _lock = threading.Lock() 

1684 

1685 def __new__( 

1686 cls, 

1687 name: str, 

1688 doc: Optional[str] = None, 

1689 canonical: Optional[int] = None, 

1690 ) -> symbol: 

1691 with cls._lock: 

1692 sym = cls.symbols.get(name) 

1693 if sym is None: 

1694 assert isinstance(name, str) 

1695 if canonical is None: 

1696 canonical = hash(name) 

1697 sym = int.__new__(symbol, canonical) 

1698 sym.name = name 

1699 if doc: 

1700 sym.__doc__ = doc 

1701 

1702 # NOTE: we should ultimately get rid of this global thing, 

1703 # however, currently it is to support pickling. The best 

1704 # change would be when we are on py3.11 at a minimum, we 

1705 # switch to stdlib enum.IntFlag. 

1706 cls.symbols[name] = sym 

1707 else: 

1708 if canonical and canonical != sym: 

1709 raise TypeError( 

1710 f"Can't replace canonical symbol for {name!r} " 

1711 f"with new int value {canonical}" 

1712 ) 

1713 return sym 

1714 

1715 def __reduce__(self): 

1716 return symbol, (self.name, "x", int(self)) 

1717 

1718 def __str__(self): 

1719 return repr(self) 

1720 

1721 def __repr__(self): 

1722 return f"symbol({self.name!r})" 

1723 

1724 

1725class _IntFlagMeta(type): 

1726 def __init__( 

1727 cls, 

1728 classname: str, 

1729 bases: Tuple[Type[Any], ...], 

1730 dict_: Dict[str, Any], 

1731 **kw: Any, 

1732 ) -> None: 

1733 items: List[symbol] 

1734 cls._items = items = [] 

1735 for k, v in dict_.items(): 

1736 if re.match(r"^__.*__$", k): 

1737 continue 

1738 if isinstance(v, int): 

1739 sym = symbol(k, canonical=v) 

1740 elif not k.startswith("_"): 

1741 raise TypeError("Expected integer values for IntFlag") 

1742 else: 

1743 continue 

1744 setattr(cls, k, sym) 

1745 items.append(sym) 

1746 

1747 cls.__members__ = _collections.immutabledict( 

1748 {sym.name: sym for sym in items} 

1749 ) 

1750 

1751 def __iter__(self) -> Iterator[symbol]: 

1752 raise NotImplementedError( 

1753 "iter not implemented to ensure compatibility with " 

1754 "Python 3.11 IntFlag. Please use __members__. See " 

1755 "https://github.com/python/cpython/issues/99304" 

1756 ) 

1757 

1758 

1759class _FastIntFlag(metaclass=_IntFlagMeta): 

1760 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1761 operations. 

1762 

1763 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1764 and ``_FastIntFlag`` otherwise. 

1765 

1766 """ 

1767 

1768 

1769if TYPE_CHECKING: 

1770 from enum import IntFlag 

1771 

1772 FastIntFlag = IntFlag 

1773else: 

1774 FastIntFlag = _FastIntFlag 

1775 

1776 

1777_E = TypeVar("_E", bound=enum.Enum) 

1778 

1779 

1780def parse_user_argument_for_enum( 

1781 arg: Any, 

1782 choices: Dict[_E, List[Any]], 

1783 name: str, 

1784 resolve_symbol_names: bool = False, 

1785) -> Optional[_E]: 

1786 """Given a user parameter, parse the parameter into a chosen value 

1787 from a list of choice objects, typically Enum values. 

1788 

1789 The user argument can be a string name that matches the name of a 

1790 symbol, or the symbol object itself, or any number of alternate choices 

1791 such as True/False/ None etc. 

1792 

1793 :param arg: the user argument. 

1794 :param choices: dictionary of enum values to lists of possible 

1795 entries for each. 

1796 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1797 that is raised if the parameter doesn't match any available argument. 

1798 

1799 """ 

1800 for enum_value, choice in choices.items(): 

1801 if arg is enum_value: 

1802 return enum_value 

1803 elif resolve_symbol_names and arg == enum_value.name: 

1804 return enum_value 

1805 elif arg in choice: 

1806 return enum_value 

1807 

1808 if arg is None: 

1809 return None 

1810 

1811 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1812 

1813 

1814_creation_order = 1 

1815 

1816 

1817def set_creation_order(instance: Any) -> None: 

1818 """Assign a '_creation_order' sequence to the given instance. 

1819 

1820 This allows multiple instances to be sorted in order of creation 

1821 (typically within a single thread; the counter is not particularly 

1822 threadsafe). 

1823 

1824 """ 

1825 global _creation_order 

1826 instance._creation_order = _creation_order 

1827 _creation_order += 1 

1828 

1829 

1830def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1831 """executes the given function, catches all exceptions and converts to 

1832 a warning. 

1833 

1834 """ 

1835 try: 

1836 return func(*args, **kwargs) 

1837 except Exception: 

1838 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1839 

1840 

1841def ellipses_string(value, len_=25): 

1842 try: 

1843 if len(value) > len_: 

1844 return "%s..." % value[0:len_] 

1845 else: 

1846 return value 

1847 except TypeError: 

1848 return value 

1849 

1850 

1851class _hash_limit_string(str): 

1852 """A string subclass that can only be hashed on a maximum amount 

1853 of unique values. 

1854 

1855 This is used for warnings so that we can send out parameterized warnings 

1856 without the __warningregistry__ of the module, or the non-overridable 

1857 "once" registry within warnings.py, overloading memory, 

1858 

1859 

1860 """ 

1861 

1862 _hash: int 

1863 

1864 def __new__( 

1865 cls, value: str, num: int, args: Sequence[Any] 

1866 ) -> _hash_limit_string: 

1867 interpolated = (value % args) + ( 

1868 " (this warning may be suppressed after %d occurrences)" % num 

1869 ) 

1870 self = super().__new__(cls, interpolated) 

1871 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1872 return self 

1873 

1874 def __hash__(self) -> int: 

1875 return self._hash 

1876 

1877 def __eq__(self, other: Any) -> bool: 

1878 return hash(self) == hash(other) 

1879 

1880 

1881def warn(msg: str, code: Optional[str] = None) -> None: 

1882 """Issue a warning. 

1883 

1884 If msg is a string, :class:`.exc.SAWarning` is used as 

1885 the category. 

1886 

1887 """ 

1888 if code: 

1889 _warnings_warn(exc.SAWarning(msg, code=code)) 

1890 else: 

1891 _warnings_warn(msg, exc.SAWarning) 

1892 

1893 

1894def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1895 """Issue a warning with a parameterized string, limiting the number 

1896 of registrations. 

1897 

1898 """ 

1899 if args: 

1900 msg = _hash_limit_string(msg, 10, args) 

1901 _warnings_warn(msg, exc.SAWarning) 

1902 

1903 

1904_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1905 

1906 

1907def tag_method_for_warnings( 

1908 message: str, category: Type[Warning] 

1909) -> Callable[[_F], _F]: 

1910 def go(fn): 

1911 _warning_tags[fn.__code__] = (message, category) 

1912 return fn 

1913 

1914 return go 

1915 

1916 

1917_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1918 

1919 

1920def _warnings_warn( 

1921 message: Union[str, Warning], 

1922 category: Optional[Type[Warning]] = None, 

1923 stacklevel: int = 2, 

1924) -> None: 

1925 

1926 if category is None and isinstance(message, Warning): 

1927 category = type(message) 

1928 

1929 # adjust the given stacklevel to be outside of SQLAlchemy 

1930 try: 

1931 frame = sys._getframe(stacklevel) 

1932 except ValueError: 

1933 # being called from less than 3 (or given) stacklevels, weird, 

1934 # but don't crash 

1935 stacklevel = 0 

1936 except: 

1937 # _getframe() doesn't work, weird interpreter issue, weird, 

1938 # ok, but don't crash 

1939 stacklevel = 0 

1940 else: 

1941 stacklevel_found = warning_tag_found = False 

1942 while frame is not None: 

1943 # using __name__ here requires that we have __name__ in the 

1944 # __globals__ of the decorated string functions we make also. 

1945 # we generate this using {"__name__": fn.__module__} 

1946 if not stacklevel_found and not re.match( 

1947 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1948 ): 

1949 # stop incrementing stack level if an out-of-SQLA line 

1950 # were found. 

1951 stacklevel_found = True 

1952 

1953 # however, for the warning tag thing, we have to keep 

1954 # scanning up the whole traceback 

1955 

1956 if frame.f_code in _warning_tags: 

1957 warning_tag_found = True 

1958 _suffix, _category = _warning_tags[frame.f_code] 

1959 category = category or _category 

1960 message = f"{message} ({_suffix})" 

1961 

1962 frame = frame.f_back # type: ignore[assignment] 

1963 

1964 if not stacklevel_found: 

1965 stacklevel += 1 

1966 elif stacklevel_found and warning_tag_found: 

1967 break 

1968 

1969 if category is not None: 

1970 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1971 else: 

1972 warnings.warn(message, stacklevel=stacklevel + 1) 

1973 

1974 

1975def only_once( 

1976 fn: Callable[..., _T], retry_on_exception: bool 

1977) -> Callable[..., Optional[_T]]: 

1978 """Decorate the given function to be a no-op after it is called exactly 

1979 once.""" 

1980 

1981 once = [fn] 

1982 

1983 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1984 # strong reference fn so that it isn't garbage collected, 

1985 # which interferes with the event system's expectations 

1986 strong_fn = fn # noqa 

1987 if once: 

1988 once_fn = once.pop() 

1989 try: 

1990 return once_fn(*arg, **kw) 

1991 except: 

1992 if retry_on_exception: 

1993 once.insert(0, once_fn) 

1994 raise 

1995 

1996 return None 

1997 

1998 return go 

1999 

2000 

2001_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

2002_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

2003 

2004 

2005def chop_traceback( 

2006 tb: List[str], 

2007 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

2008 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

2009) -> List[str]: 

2010 """Chop extraneous lines off beginning and end of a traceback. 

2011 

2012 :param tb: 

2013 a list of traceback lines as returned by ``traceback.format_stack()`` 

2014 

2015 :param exclude_prefix: 

2016 a regular expression object matching lines to skip at beginning of 

2017 ``tb`` 

2018 

2019 :param exclude_suffix: 

2020 a regular expression object matching lines to skip at end of ``tb`` 

2021 """ 

2022 start = 0 

2023 end = len(tb) - 1 

2024 while start <= end and exclude_prefix.search(tb[start]): 

2025 start += 1 

2026 while start <= end and exclude_suffix.search(tb[end]): 

2027 end -= 1 

2028 return tb[start : end + 1] 

2029 

2030 

2031def attrsetter(attrname): 

2032 code = "def set(obj, value): obj.%s = value" % attrname 

2033 env = locals().copy() 

2034 exec(code, env) 

2035 return env["set"] 

2036 

2037 

2038dunders_re = re.compile("^__.+__$") 

2039 

2040 

2041class TypingOnly: 

2042 """A mixin class that marks a class as 'typing only', meaning it has 

2043 absolutely no methods, attributes, or runtime functionality whatsoever. 

2044 

2045 """ 

2046 

2047 __slots__ = () 

2048 

2049 def __init_subclass__(cls, **kw: Any) -> None: 

2050 if TypingOnly in cls.__bases__: 

2051 remaining = { 

2052 name for name in cls.__dict__ if not dunders_re.match(name) 

2053 } 

2054 if remaining: 

2055 raise AssertionError( 

2056 f"Class {cls} directly inherits TypingOnly but has " 

2057 f"additional attributes {remaining}." 

2058 ) 

2059 super().__init_subclass__(**kw) 

2060 

2061 

2062class EnsureKWArg: 

2063 r"""Apply translation of functions to accept \**kw arguments if they 

2064 don't already. 

2065 

2066 Used to ensure cross-compatibility with third party legacy code, for things 

2067 like compiler visit methods that need to accept ``**kw`` arguments, 

2068 but may have been copied from old code that didn't accept them. 

2069 

2070 """ 

2071 

2072 ensure_kwarg: str 

2073 """a regular expression that indicates method names for which the method 

2074 should accept ``**kw`` arguments. 

2075 

2076 The class will scan for methods matching the name template and decorate 

2077 them if necessary to ensure ``**kw`` parameters are accepted. 

2078 

2079 """ 

2080 

2081 def __init_subclass__(cls) -> None: 

2082 fn_reg = cls.ensure_kwarg 

2083 clsdict = cls.__dict__ 

2084 if fn_reg: 

2085 for key in clsdict: 

2086 m = re.match(fn_reg, key) 

2087 if m: 

2088 fn = clsdict[key] 

2089 spec = compat.inspect_getfullargspec(fn) 

2090 if not spec.varkw: 

2091 wrapped = cls._wrap_w_kw(fn) 

2092 setattr(cls, key, wrapped) 

2093 super().__init_subclass__() 

2094 

2095 @classmethod 

2096 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2097 def wrap(*arg: Any, **kw: Any) -> Any: 

2098 return fn(*arg) 

2099 

2100 return update_wrapper(wrap, fn) 

2101 

2102 

2103def wrap_callable(wrapper, fn): 

2104 """Augment functools.update_wrapper() to work with objects with 

2105 a ``__call__()`` method. 

2106 

2107 :param fn: 

2108 object with __call__ method 

2109 

2110 """ 

2111 if hasattr(fn, "__name__"): 

2112 return update_wrapper(wrapper, fn) 

2113 else: 

2114 _f = wrapper 

2115 _f.__name__ = fn.__class__.__name__ 

2116 if hasattr(fn, "__module__"): 

2117 _f.__module__ = fn.__module__ 

2118 

2119 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2120 _f.__doc__ = fn.__call__.__doc__ 

2121 elif fn.__doc__: 

2122 _f.__doc__ = fn.__doc__ 

2123 

2124 return _f 

2125 

2126 

2127def quoted_token_parser(value): 

2128 """Parse a dotted identifier with accommodation for quoted names. 

2129 

2130 Includes support for SQL-style double quotes as a literal character. 

2131 

2132 E.g.:: 

2133 

2134 >>> quoted_token_parser("name") 

2135 ["name"] 

2136 >>> quoted_token_parser("schema.name") 

2137 ["schema", "name"] 

2138 >>> quoted_token_parser('"Schema"."Name"') 

2139 ['Schema', 'Name'] 

2140 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2141 ['Schema', 'Name""Foo'] 

2142 

2143 """ 

2144 

2145 if '"' not in value: 

2146 return value.split(".") 

2147 

2148 # 0 = outside of quotes 

2149 # 1 = inside of quotes 

2150 state = 0 

2151 result: List[List[str]] = [[]] 

2152 idx = 0 

2153 lv = len(value) 

2154 while idx < lv: 

2155 char = value[idx] 

2156 if char == '"': 

2157 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2158 result[-1].append('"') 

2159 idx += 1 

2160 else: 

2161 state ^= 1 

2162 elif char == "." and state == 0: 

2163 result.append([]) 

2164 else: 

2165 result[-1].append(char) 

2166 idx += 1 

2167 

2168 return ["".join(token) for token in result] 

2169 

2170 

2171def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2172 params = _collections.to_list(params) 

2173 

2174 def decorate(fn): 

2175 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2176 if doc: 

2177 doc = inject_param_text(doc, {param: text for param in params}) 

2178 fn.__doc__ = doc 

2179 return fn 

2180 

2181 return decorate 

2182 

2183 

2184def _dedent_docstring(text: str) -> str: 

2185 split_text = text.split("\n", 1) 

2186 if len(split_text) == 1: 

2187 return text 

2188 else: 

2189 firstline, remaining = split_text 

2190 if not firstline.startswith(" "): 

2191 return firstline + "\n" + textwrap.dedent(remaining) 

2192 else: 

2193 return textwrap.dedent(text) 

2194 

2195 

2196def inject_docstring_text( 

2197 given_doctext: Optional[str], injecttext: str, pos: int 

2198) -> str: 

2199 doctext: str = _dedent_docstring(given_doctext or "") 

2200 lines = doctext.split("\n") 

2201 if len(lines) == 1: 

2202 lines.append("") 

2203 injectlines = textwrap.dedent(injecttext).split("\n") 

2204 if injectlines[0]: 

2205 injectlines.insert(0, "") 

2206 

2207 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2208 blanks.insert(0, 0) 

2209 

2210 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2211 

2212 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2213 return "\n".join(lines) 

2214 

2215 

2216_param_reg = re.compile(r"(\s+):param (.+?):") 

2217 

2218 

2219def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2220 doclines = collections.deque(doctext.splitlines()) 

2221 lines = [] 

2222 

2223 # TODO: this is not working for params like ":param case_sensitive=True:" 

2224 

2225 to_inject = None 

2226 while doclines: 

2227 line = doclines.popleft() 

2228 

2229 m = _param_reg.match(line) 

2230 

2231 if to_inject is None: 

2232 if m: 

2233 param = m.group(2).lstrip("*") 

2234 if param in inject_params: 

2235 # default indent to that of :param: plus one 

2236 indent = " " * len(m.group(1)) + " " 

2237 

2238 # but if the next line has text, use that line's 

2239 # indentation 

2240 if doclines: 

2241 m2 = re.match(r"(\s+)\S", doclines[0]) 

2242 if m2: 

2243 indent = " " * len(m2.group(1)) 

2244 

2245 to_inject = indent + inject_params[param] 

2246 elif m: 

2247 lines.extend(["\n", to_inject, "\n"]) 

2248 to_inject = None 

2249 elif not line.rstrip(): 

2250 lines.extend([line, to_inject, "\n"]) 

2251 to_inject = None 

2252 elif line.endswith("::"): 

2253 # TODO: this still won't cover if the code example itself has 

2254 # blank lines in it, need to detect those via indentation. 

2255 lines.extend([line, doclines.popleft()]) 

2256 continue 

2257 lines.append(line) 

2258 

2259 return "\n".join(lines) 

2260 

2261 

2262def repr_tuple_names(names: List[str]) -> Optional[str]: 

2263 """Trims a list of strings from the middle and return a string of up to 

2264 four elements. Strings greater than 11 characters will be truncated""" 

2265 if len(names) == 0: 

2266 return None 

2267 flag = len(names) <= 4 

2268 names = names[0:4] if flag else names[0:3] + names[-1:] 

2269 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2270 if flag: 

2271 return ", ".join(res) 

2272 else: 

2273 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2274 

2275 

2276def has_compiled_ext(raise_=False): 

2277 from ._has_cython import HAS_CYEXTENSION 

2278 

2279 if HAS_CYEXTENSION: 

2280 return True 

2281 elif raise_: 

2282 raise ImportError( 

2283 "cython extensions were expected to be installed, " 

2284 "but are not present" 

2285 ) 

2286 else: 

2287 return False 

2288 

2289 

2290def load_uncompiled_module(module: _M) -> _M: 

2291 """Load the non-compied version of a module that is also 

2292 compiled with cython. 

2293 """ 

2294 full_name = module.__name__ 

2295 assert module.__spec__ 

2296 parent_name = module.__spec__.parent 

2297 assert parent_name 

2298 parent_module = sys.modules[parent_name] 

2299 assert parent_module.__spec__ 

2300 package_path = parent_module.__spec__.origin 

2301 assert package_path and package_path.endswith("__init__.py") 

2302 

2303 name = full_name.split(".")[-1] 

2304 module_path = package_path.replace("__init__.py", f"{name}.py") 

2305 

2306 py_spec = importlib.util.spec_from_file_location(full_name, module_path) 

2307 assert py_spec 

2308 py_module = importlib.util.module_from_spec(py_spec) 

2309 assert py_spec.loader 

2310 py_spec.loader.exec_module(py_module) 

2311 return cast(_M, py_module) 

2312 

2313 

2314class _Missing(enum.Enum): 

2315 Missing = enum.auto() 

2316 

2317 

2318Missing = _Missing.Missing 

2319MissingOr = Union[_T, Literal[_Missing.Missing]]