Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1045 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13from __future__ import annotations 

14 

15import collections 

16import enum 

17from functools import update_wrapper 

18import importlib.util 

19import inspect 

20import itertools 

21import operator 

22import re 

23import sys 

24import textwrap 

25import threading 

26import types 

27from types import CodeType 

28from types import ModuleType 

29from typing import Any 

30from typing import Callable 

31from typing import cast 

32from typing import Dict 

33from typing import FrozenSet 

34from typing import Generic 

35from typing import Iterator 

36from typing import List 

37from typing import Literal 

38from typing import Mapping 

39from typing import NoReturn 

40from typing import Optional 

41from typing import overload 

42from typing import Sequence 

43from typing import Set 

44from typing import Tuple 

45from typing import Type 

46from typing import TYPE_CHECKING 

47from typing import TypeVar 

48from typing import Union 

49import warnings 

50 

51from . import _collections 

52from . import compat 

53from .. import exc 

54 

55_T = TypeVar("_T") 

56_T_co = TypeVar("_T_co", covariant=True) 

57_F = TypeVar("_F", bound=Callable[..., Any]) 

58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

59_M = TypeVar("_M", bound=ModuleType) 

60 

61if compat.py314: 

62 # vendor a minimal form of get_annotations per 

63 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891 

64 

65 from annotationlib import call_annotate_function # type: ignore 

66 from annotationlib import Format 

67 

68 def _get_and_call_annotate(obj, format): # noqa: A002 

69 annotate = getattr(obj, "__annotate__", None) 

70 if annotate is not None: 

71 ann = call_annotate_function(annotate, format, owner=obj) 

72 if not isinstance(ann, dict): 

73 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict") 

74 return ann 

75 return None 

76 

77 # this is ported from py3.13.0a7 

78 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501 

79 

80 def _get_dunder_annotations(obj): 

81 if isinstance(obj, type): 

82 try: 

83 ann = _BASE_GET_ANNOTATIONS(obj) 

84 except AttributeError: 

85 # For static types, the descriptor raises AttributeError. 

86 return {} 

87 else: 

88 ann = getattr(obj, "__annotations__", None) 

89 if ann is None: 

90 return {} 

91 

92 if not isinstance(ann, dict): 

93 raise ValueError( 

94 f"{obj!r}.__annotations__ is neither a dict nor None" 

95 ) 

96 return dict(ann) 

97 

98 def _vendored_get_annotations( 

99 obj: Any, *, format: Format # noqa: A002 

100 ) -> Mapping[str, Any]: 

101 """A sparse implementation of annotationlib.get_annotations()""" 

102 

103 try: 

104 ann = _get_dunder_annotations(obj) 

105 except Exception: 

106 pass 

107 else: 

108 if ann is not None: 

109 return dict(ann) 

110 

111 # But if __annotations__ threw a NameError, we try calling __annotate__ 

112 ann = _get_and_call_annotate(obj, format) 

113 if ann is None: 

114 # If that didn't work either, we have a very weird object: 

115 # evaluating 

116 # __annotations__ threw NameError and there is no __annotate__. 

117 # In that case, 

118 # we fall back to trying __annotations__ again. 

119 ann = _get_dunder_annotations(obj) 

120 

121 if ann is None: 

122 if isinstance(obj, type) or callable(obj): 

123 return {} 

124 raise TypeError(f"{obj!r} does not have annotations") 

125 

126 if not ann: 

127 return {} 

128 

129 return dict(ann) 

130 

131 def get_annotations(obj: Any) -> Mapping[str, Any]: 

132 # FORWARDREF has the effect of giving us ForwardRefs and not 

133 # actually trying to evaluate the annotations. We need this so 

134 # that the annotations act as much like 

135 # "from __future__ import annotations" as possible, which is going 

136 # away in future python as a separate mode 

137 return _vendored_get_annotations(obj, format=Format.FORWARDREF) 

138 

139else: 

140 

141 def get_annotations(obj: Any) -> Mapping[str, Any]: 

142 return inspect.get_annotations(obj) 

143 

144 

145def md5_hex(x: Any) -> str: 

146 x = x.encode("utf-8") 

147 m = compat.md5_not_for_security() 

148 m.update(x) 

149 return cast(str, m.hexdigest()) 

150 

151 

152class safe_reraise: 

153 """Reraise an exception after invoking some 

154 handler code. 

155 

156 Stores the existing exception info before 

157 invoking so that it is maintained across a potential 

158 coroutine context switch. 

159 

160 e.g.:: 

161 

162 try: 

163 sess.commit() 

164 except: 

165 with safe_reraise(): 

166 sess.rollback() 

167 

168 TODO: we should at some point evaluate current behaviors in this regard 

169 based on current greenlet, gevent/eventlet implementations in Python 3, and 

170 also see the degree to which our own asyncio (based on greenlet also) is 

171 impacted by this. .rollback() will cause IO / context switch to occur in 

172 all these scenarios; what happens to the exception context from an 

173 "except:" block if we don't explicitly store it? Original issue was #2703. 

174 

175 """ 

176 

177 __slots__ = ("_exc_info",) 

178 

179 _exc_info: Union[ 

180 None, 

181 Tuple[ 

182 Type[BaseException], 

183 BaseException, 

184 types.TracebackType, 

185 ], 

186 Tuple[None, None, None], 

187 ] 

188 

189 def __enter__(self) -> None: 

190 self._exc_info = sys.exc_info() 

191 

192 def __exit__( 

193 self, 

194 type_: Optional[Type[BaseException]], 

195 value: Optional[BaseException], 

196 traceback: Optional[types.TracebackType], 

197 ) -> NoReturn: 

198 assert self._exc_info is not None 

199 # see #2703 for notes 

200 if type_ is None: 

201 exc_type, exc_value, exc_tb = self._exc_info 

202 assert exc_value is not None 

203 self._exc_info = None # remove potential circular references 

204 raise exc_value.with_traceback(exc_tb) 

205 else: 

206 self._exc_info = None # remove potential circular references 

207 assert value is not None 

208 raise value.with_traceback(traceback) 

209 

210 

211def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

212 seen: Set[Any] = set() 

213 

214 stack = [cls] 

215 while stack: 

216 cls = stack.pop() 

217 if cls in seen: 

218 continue 

219 else: 

220 seen.add(cls) 

221 stack.extend(cls.__subclasses__()) 

222 yield cls 

223 

224 

225def string_or_unprintable(element: Any) -> str: 

226 if isinstance(element, str): 

227 return element 

228 else: 

229 try: 

230 return str(element) 

231 except Exception: 

232 return "unprintable element %r" % element 

233 

234 

235def clsname_as_plain_name( 

236 cls: Type[Any], use_name: Optional[str] = None 

237) -> str: 

238 name = use_name or cls.__name__ 

239 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

240 

241 

242def method_is_overridden( 

243 instance_or_cls: Union[Type[Any], object], 

244 against_method: Callable[..., Any], 

245) -> bool: 

246 """Return True if the two class methods don't match.""" 

247 

248 if not isinstance(instance_or_cls, type): 

249 current_cls = instance_or_cls.__class__ 

250 else: 

251 current_cls = instance_or_cls 

252 

253 method_name = against_method.__name__ 

254 

255 current_method: types.MethodType = getattr(current_cls, method_name) 

256 

257 return current_method != against_method 

258 

259 

260def decode_slice(slc: slice) -> Tuple[Any, ...]: 

261 """decode a slice object as sent to __getitem__. 

262 

263 takes into account the 2.5 __index__() method, basically. 

264 

265 """ 

266 ret: List[Any] = [] 

267 for x in slc.start, slc.stop, slc.step: 

268 if hasattr(x, "__index__"): 

269 x = x.__index__() 

270 ret.append(x) 

271 return tuple(ret) 

272 

273 

274def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

275 used_set = set(used) 

276 for base in bases: 

277 pool = itertools.chain( 

278 (base,), 

279 map(lambda i: base + str(i), range(1000)), 

280 ) 

281 for sym in pool: 

282 if sym not in used_set: 

283 used_set.add(sym) 

284 yield sym 

285 break 

286 else: 

287 raise NameError("exhausted namespace for symbol base %s" % base) 

288 

289 

290def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

291 """Call the given function given each nonzero bit from n.""" 

292 

293 while n: 

294 b = n & (~n + 1) 

295 yield fn(b) 

296 n ^= b 

297 

298 

299_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

300 

301# this seems to be in flux in recent mypy versions 

302 

303 

304def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

305 """A signature-matching decorator factory.""" 

306 

307 def decorate(fn: _Fn) -> _Fn: 

308 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

309 raise Exception("not a decoratable function") 

310 

311 # Python 3.14 defer creating __annotations__ until its used. 

312 # We do not want to create __annotations__ now. 

313 annofunc = getattr(fn, "__annotate__", None) 

314 if annofunc is not None: 

315 fn.__annotate__ = None # type: ignore[union-attr] 

316 try: 

317 spec = compat.inspect_getfullargspec(fn) 

318 finally: 

319 fn.__annotate__ = annofunc # type: ignore[union-attr] 

320 else: 

321 spec = compat.inspect_getfullargspec(fn) 

322 

323 # Do not generate code for annotations. 

324 # update_wrapper() copies the annotation from fn to decorated. 

325 # We use dummy defaults for code generation to avoid having 

326 # copy of large globals for compiling. 

327 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

328 empty_defaults = (None,) * len(spec.defaults or ()) 

329 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

330 spec = spec._replace( 

331 annotations={}, 

332 defaults=empty_defaults, 

333 kwonlydefaults=empty_kwdefaults, 

334 ) 

335 

336 names = ( 

337 tuple(cast("Tuple[str, ...]", spec[0])) 

338 + cast("Tuple[str, ...]", spec[1:3]) 

339 + (fn.__name__,) 

340 ) 

341 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

342 

343 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

344 metadata.update(format_argspec_plus(spec, grouped=False)) 

345 metadata["name"] = fn.__name__ 

346 

347 if inspect.iscoroutinefunction(fn): 

348 metadata["prefix"] = "async " 

349 metadata["target_prefix"] = "await " 

350 else: 

351 metadata["prefix"] = "" 

352 metadata["target_prefix"] = "" 

353 

354 # look for __ positional arguments. This is a convention in 

355 # SQLAlchemy that arguments should be passed positionally 

356 # rather than as keyword 

357 # arguments. note that apply_pos doesn't currently work in all cases 

358 # such as when a kw-only indicator "*" is present, which is why 

359 # we limit the use of this to just that case we can detect. As we add 

360 # more kinds of methods that use @decorator, things may have to 

361 # be further improved in this area 

362 if "__" in repr(spec[0]): 

363 code = ( 

364 """\ 

365%(prefix)sdef %(name)s%(grouped_args)s: 

366 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

367""" 

368 % metadata 

369 ) 

370 else: 

371 code = ( 

372 """\ 

373%(prefix)sdef %(name)s%(grouped_args)s: 

374 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

375""" 

376 % metadata 

377 ) 

378 

379 env: Dict[str, Any] = { 

380 targ_name: target, 

381 fn_name: fn, 

382 "__name__": fn.__module__, 

383 } 

384 

385 decorated = cast( 

386 types.FunctionType, 

387 _exec_code_in_env(code, env, fn.__name__), 

388 ) 

389 decorated.__defaults__ = fn.__defaults__ 

390 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

391 return update_wrapper(decorated, fn) # type: ignore[return-value] 

392 

393 return update_wrapper(decorate, target) # type: ignore[return-value] 

394 

395 

396def _exec_code_in_env( 

397 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

398) -> Callable[..., Any]: 

399 exec(code, env) 

400 return env[fn_name] # type: ignore[no-any-return] 

401 

402 

403_PF = TypeVar("_PF") 

404_TE = TypeVar("_TE") 

405 

406 

407class PluginLoader: 

408 def __init__( 

409 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

410 ): 

411 self.group = group 

412 self.impls: Dict[str, Any] = {} 

413 self.auto_fn = auto_fn 

414 

415 def clear(self): 

416 self.impls.clear() 

417 

418 def load(self, name: str) -> Any: 

419 if name in self.impls: 

420 return self.impls[name]() 

421 

422 if self.auto_fn: 

423 loader = self.auto_fn(name) 

424 if loader: 

425 self.impls[name] = loader 

426 return loader() 

427 

428 for impl in compat.importlib_metadata_get(self.group): 

429 if impl.name == name: 

430 self.impls[name] = impl.load 

431 return impl.load() 

432 

433 raise exc.NoSuchModuleError( 

434 "Can't load plugin: %s:%s" % (self.group, name) 

435 ) 

436 

437 def register(self, name: str, modulepath: str, objname: str) -> None: 

438 def load(): 

439 mod = __import__(modulepath) 

440 for token in modulepath.split(".")[1:]: 

441 mod = getattr(mod, token) 

442 return getattr(mod, objname) 

443 

444 self.impls[name] = load 

445 

446 def deregister(self, name: str) -> None: 

447 del self.impls[name] 

448 

449 

450def _inspect_func_args(fn): 

451 try: 

452 co_varkeywords = inspect.CO_VARKEYWORDS 

453 except AttributeError: 

454 # https://docs.python.org/3/library/inspect.html 

455 # The flags are specific to CPython, and may not be defined in other 

456 # Python implementations. Furthermore, the flags are an implementation 

457 # detail, and can be removed or deprecated in future Python releases. 

458 spec = compat.inspect_getfullargspec(fn) 

459 return spec[0], bool(spec[2]) 

460 else: 

461 # use fn.__code__ plus flags to reduce method call overhead 

462 co = fn.__code__ 

463 nargs = co.co_argcount 

464 return ( 

465 list(co.co_varnames[:nargs]), 

466 bool(co.co_flags & co_varkeywords), 

467 ) 

468 

469 

470@overload 

471def get_cls_kwargs( 

472 cls: type, 

473 *, 

474 _set: Optional[Set[str]] = None, 

475 raiseerr: Literal[True] = ..., 

476) -> Set[str]: ... 

477 

478 

479@overload 

480def get_cls_kwargs( 

481 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

482) -> Optional[Set[str]]: ... 

483 

484 

485def get_cls_kwargs( 

486 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

487) -> Optional[Set[str]]: 

488 r"""Return the full set of inherited kwargs for the given `cls`. 

489 

490 Probes a class's __init__ method, collecting all named arguments. If the 

491 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

492 to pass along unrecognized keywords to its base classes, and the 

493 collection process is repeated recursively on each of the bases. 

494 

495 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

496 as this is used within the Core typing system to create copies of type 

497 objects which is a performance-sensitive operation. 

498 

499 No anonymous tuple arguments please ! 

500 

501 """ 

502 toplevel = _set is None 

503 if toplevel: 

504 _set = set() 

505 assert _set is not None 

506 

507 ctr = cls.__dict__.get("__init__", False) 

508 

509 has_init = ( 

510 ctr 

511 and isinstance(ctr, types.FunctionType) 

512 and isinstance(ctr.__code__, types.CodeType) 

513 ) 

514 

515 if has_init: 

516 names, has_kw = _inspect_func_args(ctr) 

517 _set.update(names) 

518 

519 if not has_kw and not toplevel: 

520 if raiseerr: 

521 raise TypeError( 

522 f"given cls {cls} doesn't have an __init__ method" 

523 ) 

524 else: 

525 return None 

526 else: 

527 has_kw = False 

528 

529 if not has_init or has_kw: 

530 for c in cls.__bases__: 

531 if get_cls_kwargs(c, _set=_set) is None: 

532 break 

533 

534 _set.discard("self") 

535 return _set 

536 

537 

538def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

539 """Return the set of legal kwargs for the given `func`. 

540 

541 Uses getargspec so is safe to call for methods, functions, 

542 etc. 

543 

544 """ 

545 

546 return compat.inspect_getfullargspec(func)[0] 

547 

548 

549def get_callable_argspec( 

550 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

551) -> compat.FullArgSpec: 

552 """Return the argument signature for any callable. 

553 

554 All pure-Python callables are accepted, including 

555 functions, methods, classes, objects with __call__; 

556 builtins and other edge cases like functools.partial() objects 

557 raise a TypeError. 

558 

559 """ 

560 if inspect.isbuiltin(fn): 

561 raise TypeError("Can't inspect builtin: %s" % fn) 

562 elif inspect.isfunction(fn): 

563 if _is_init and no_self: 

564 spec = compat.inspect_getfullargspec(fn) 

565 return compat.FullArgSpec( 

566 spec.args[1:], 

567 spec.varargs, 

568 spec.varkw, 

569 spec.defaults, 

570 spec.kwonlyargs, 

571 spec.kwonlydefaults, 

572 spec.annotations, 

573 ) 

574 else: 

575 return compat.inspect_getfullargspec(fn) 

576 elif inspect.ismethod(fn): 

577 if no_self and (_is_init or fn.__self__): 

578 spec = compat.inspect_getfullargspec(fn.__func__) 

579 return compat.FullArgSpec( 

580 spec.args[1:], 

581 spec.varargs, 

582 spec.varkw, 

583 spec.defaults, 

584 spec.kwonlyargs, 

585 spec.kwonlydefaults, 

586 spec.annotations, 

587 ) 

588 else: 

589 return compat.inspect_getfullargspec(fn.__func__) 

590 elif inspect.isclass(fn): 

591 return get_callable_argspec( 

592 fn.__init__, no_self=no_self, _is_init=True 

593 ) 

594 elif hasattr(fn, "__func__"): 

595 return compat.inspect_getfullargspec(fn.__func__) 

596 elif hasattr(fn, "__call__"): 

597 if inspect.ismethod(fn.__call__): 

598 return get_callable_argspec(fn.__call__, no_self=no_self) 

599 else: 

600 raise TypeError("Can't inspect callable: %s" % fn) 

601 else: 

602 raise TypeError("Can't inspect callable: %s" % fn) 

603 

604 

605def format_argspec_plus( 

606 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

607) -> Dict[str, Optional[str]]: 

608 """Returns a dictionary of formatted, introspected function arguments. 

609 

610 A enhanced variant of inspect.formatargspec to support code generation. 

611 

612 fn 

613 An inspectable callable or tuple of inspect getargspec() results. 

614 grouped 

615 Defaults to True; include (parens, around, argument) lists 

616 

617 Returns: 

618 

619 args 

620 Full inspect.formatargspec for fn 

621 self_arg 

622 The name of the first positional argument, varargs[0], or None 

623 if the function defines no positional arguments. 

624 apply_pos 

625 args, re-written in calling rather than receiving syntax. Arguments are 

626 passed positionally. 

627 apply_kw 

628 Like apply_pos, except keyword-ish args are passed as keywords. 

629 apply_pos_proxied 

630 Like apply_pos but omits the self/cls argument 

631 

632 Example:: 

633 

634 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

635 {'grouped_args': '(self, a, b, c=3, **d)', 

636 'self_arg': 'self', 

637 'apply_kw': '(self, a, b, c=c, **d)', 

638 'apply_pos': '(self, a, b, c, **d)'} 

639 

640 """ 

641 if callable(fn): 

642 spec = compat.inspect_getfullargspec(fn) 

643 else: 

644 spec = fn 

645 

646 args = compat.inspect_formatargspec(*spec) 

647 

648 apply_pos = compat.inspect_formatargspec( 

649 spec[0], spec[1], spec[2], None, spec[4] 

650 ) 

651 

652 if spec[0]: 

653 self_arg = spec[0][0] 

654 

655 apply_pos_proxied = compat.inspect_formatargspec( 

656 spec[0][1:], spec[1], spec[2], None, spec[4] 

657 ) 

658 

659 elif spec[1]: 

660 # I'm not sure what this is 

661 self_arg = "%s[0]" % spec[1] 

662 

663 apply_pos_proxied = apply_pos 

664 else: 

665 self_arg = None 

666 apply_pos_proxied = apply_pos 

667 

668 num_defaults = 0 

669 if spec[3]: 

670 num_defaults += len(cast(Tuple[Any], spec[3])) 

671 if spec[4]: 

672 num_defaults += len(spec[4]) 

673 

674 name_args = spec[0] + spec[4] 

675 

676 defaulted_vals: Union[List[str], Tuple[()]] 

677 

678 if num_defaults: 

679 defaulted_vals = name_args[0 - num_defaults :] 

680 else: 

681 defaulted_vals = () 

682 

683 apply_kw = compat.inspect_formatargspec( 

684 name_args, 

685 spec[1], 

686 spec[2], 

687 defaulted_vals, 

688 formatvalue=lambda x: "=" + str(x), 

689 ) 

690 

691 if spec[0]: 

692 apply_kw_proxied = compat.inspect_formatargspec( 

693 name_args[1:], 

694 spec[1], 

695 spec[2], 

696 defaulted_vals, 

697 formatvalue=lambda x: "=" + str(x), 

698 ) 

699 else: 

700 apply_kw_proxied = apply_kw 

701 

702 if grouped: 

703 return dict( 

704 grouped_args=args, 

705 self_arg=self_arg, 

706 apply_pos=apply_pos, 

707 apply_kw=apply_kw, 

708 apply_pos_proxied=apply_pos_proxied, 

709 apply_kw_proxied=apply_kw_proxied, 

710 ) 

711 else: 

712 return dict( 

713 grouped_args=args, 

714 self_arg=self_arg, 

715 apply_pos=apply_pos[1:-1], 

716 apply_kw=apply_kw[1:-1], 

717 apply_pos_proxied=apply_pos_proxied[1:-1], 

718 apply_kw_proxied=apply_kw_proxied[1:-1], 

719 ) 

720 

721 

722def format_argspec_init(method, grouped=True): 

723 """format_argspec_plus with considerations for typical __init__ methods 

724 

725 Wraps format_argspec_plus with error handling strategies for typical 

726 __init__ cases: 

727 

728 .. sourcecode:: text 

729 

730 object.__init__ -> (self) 

731 other unreflectable (usually C) -> (self, *args, **kwargs) 

732 

733 """ 

734 if method is object.__init__: 

735 grouped_args = "(self)" 

736 args = "(self)" if grouped else "self" 

737 proxied = "()" if grouped else "" 

738 else: 

739 try: 

740 return format_argspec_plus(method, grouped=grouped) 

741 except TypeError: 

742 grouped_args = "(self, *args, **kwargs)" 

743 args = grouped_args if grouped else "self, *args, **kwargs" 

744 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

745 return dict( 

746 self_arg="self", 

747 grouped_args=grouped_args, 

748 apply_pos=args, 

749 apply_kw=args, 

750 apply_pos_proxied=proxied, 

751 apply_kw_proxied=proxied, 

752 ) 

753 

754 

755def create_proxy_methods( 

756 target_cls: Type[Any], 

757 target_cls_sphinx_name: str, 

758 proxy_cls_sphinx_name: str, 

759 classmethods: Sequence[str] = (), 

760 methods: Sequence[str] = (), 

761 attributes: Sequence[str] = (), 

762 use_intermediate_variable: Sequence[str] = (), 

763) -> Callable[[_T], _T]: 

764 """A class decorator indicating attributes should refer to a proxy 

765 class. 

766 

767 This decorator is now a "marker" that does nothing at runtime. Instead, 

768 it is consumed by the tools/generate_proxy_methods.py script to 

769 statically generate proxy methods and attributes that are fully 

770 recognized by typing tools such as mypy. 

771 

772 """ 

773 

774 def decorate(cls): 

775 return cls 

776 

777 return decorate 

778 

779 

780def getargspec_init(method): 

781 """inspect.getargspec with considerations for typical __init__ methods 

782 

783 Wraps inspect.getargspec with error handling for typical __init__ cases: 

784 

785 .. sourcecode:: text 

786 

787 object.__init__ -> (self) 

788 other unreflectable (usually C) -> (self, *args, **kwargs) 

789 

790 """ 

791 try: 

792 return compat.inspect_getfullargspec(method) 

793 except TypeError: 

794 if method is object.__init__: 

795 return (["self"], None, None, None) 

796 else: 

797 return (["self"], "args", "kwargs", None) 

798 

799 

800def unbound_method_to_callable(func_or_cls): 

801 """Adjust the incoming callable such that a 'self' argument is not 

802 required. 

803 

804 """ 

805 

806 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

807 return func_or_cls.__func__ 

808 else: 

809 return func_or_cls 

810 

811 

812def generic_repr( 

813 obj: Any, 

814 additional_kw: Sequence[Tuple[str, Any]] = (), 

815 to_inspect: Optional[Union[object, List[object]]] = None, 

816 omit_kwarg: Sequence[str] = (), 

817) -> str: 

818 """Produce a __repr__() based on direct association of the __init__() 

819 specification vs. same-named attributes present. 

820 

821 """ 

822 if to_inspect is None: 

823 to_inspect = [obj] 

824 else: 

825 to_inspect = _collections.to_list(to_inspect) 

826 

827 missing = object() 

828 

829 pos_args = [] 

830 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict() 

831 vargs = None 

832 for i, insp in enumerate(to_inspect): 

833 try: 

834 spec = compat.inspect_getfullargspec(insp.__init__) 

835 except TypeError: 

836 continue 

837 else: 

838 default_len = len(spec.defaults) if spec.defaults else 0 

839 if i == 0: 

840 if spec.varargs: 

841 vargs = spec.varargs 

842 if default_len: 

843 pos_args.extend(spec.args[1:-default_len]) 

844 else: 

845 pos_args.extend(spec.args[1:]) 

846 else: 

847 kw_args.update( 

848 [(arg, missing) for arg in spec.args[1:-default_len]] 

849 ) 

850 

851 if default_len: 

852 assert spec.defaults 

853 kw_args.update( 

854 [ 

855 (arg, default) 

856 for arg, default in zip( 

857 spec.args[-default_len:], spec.defaults 

858 ) 

859 ] 

860 ) 

861 output: List[str] = [] 

862 

863 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

864 

865 if vargs is not None and hasattr(obj, vargs): 

866 output.extend([repr(val) for val in getattr(obj, vargs)]) 

867 

868 for arg, defval in kw_args.items(): 

869 if arg in omit_kwarg: 

870 continue 

871 try: 

872 val = getattr(obj, arg, missing) 

873 if val is not missing and val != defval: 

874 output.append("%s=%r" % (arg, val)) 

875 except Exception: 

876 pass 

877 

878 if additional_kw: 

879 for arg, defval in additional_kw: 

880 try: 

881 val = getattr(obj, arg, missing) 

882 if val is not missing and val != defval: 

883 output.append("%s=%r" % (arg, val)) 

884 except Exception: 

885 pass 

886 

887 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output)) 

888 

889 

890def class_hierarchy(cls): 

891 """Return an unordered sequence of all classes related to cls. 

892 

893 Traverses diamond hierarchies. 

894 

895 Fibs slightly: subclasses of builtin types are not returned. Thus 

896 class_hierarchy(class A(object)) returns (A, object), not A plus every 

897 class systemwide that derives from object. 

898 

899 """ 

900 

901 hier = {cls} 

902 process = list(cls.__mro__) 

903 while process: 

904 c = process.pop() 

905 bases = (_ for _ in c.__bases__ if _ not in hier) 

906 

907 for b in bases: 

908 process.append(b) 

909 hier.add(b) 

910 

911 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

912 continue 

913 

914 for s in [ 

915 _ 

916 for _ in ( 

917 c.__subclasses__() 

918 if not issubclass(c, type) 

919 else c.__subclasses__(c) 

920 ) 

921 if _ not in hier 

922 ]: 

923 process.append(s) 

924 hier.add(s) 

925 return list(hier) 

926 

927 

928def iterate_attributes(cls): 

929 """iterate all the keys and attributes associated 

930 with a class, without using getattr(). 

931 

932 Does not use getattr() so that class-sensitive 

933 descriptors (i.e. property.__get__()) are not called. 

934 

935 """ 

936 keys = dir(cls) 

937 for key in keys: 

938 for c in cls.__mro__: 

939 if key in c.__dict__: 

940 yield (key, c.__dict__[key]) 

941 break 

942 

943 

944def monkeypatch_proxied_specials( 

945 into_cls, 

946 from_cls, 

947 skip=None, 

948 only=None, 

949 name="self.proxy", 

950 from_instance=None, 

951): 

952 """Automates delegation of __specials__ for a proxying type.""" 

953 

954 if only: 

955 dunders = only 

956 else: 

957 if skip is None: 

958 skip = ( 

959 "__slots__", 

960 "__del__", 

961 "__getattribute__", 

962 "__metaclass__", 

963 "__getstate__", 

964 "__setstate__", 

965 ) 

966 dunders = [ 

967 m 

968 for m in dir(from_cls) 

969 if ( 

970 m.startswith("__") 

971 and m.endswith("__") 

972 and not hasattr(into_cls, m) 

973 and m not in skip 

974 ) 

975 ] 

976 

977 for method in dunders: 

978 try: 

979 maybe_fn = getattr(from_cls, method) 

980 if not hasattr(maybe_fn, "__call__"): 

981 continue 

982 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

983 fn = cast(types.FunctionType, maybe_fn) 

984 

985 except AttributeError: 

986 continue 

987 try: 

988 spec = compat.inspect_getfullargspec(fn) 

989 fn_args = compat.inspect_formatargspec(spec[0]) 

990 d_args = compat.inspect_formatargspec(spec[0][1:]) 

991 except TypeError: 

992 fn_args = "(self, *args, **kw)" 

993 d_args = "(*args, **kw)" 

994 

995 py = ( 

996 "def %(method)s%(fn_args)s: " 

997 "return %(name)s.%(method)s%(d_args)s" % locals() 

998 ) 

999 

1000 env: Dict[str, types.FunctionType] = ( 

1001 from_instance is not None and {name: from_instance} or {} 

1002 ) 

1003 exec(py, env) 

1004 try: 

1005 env[method].__defaults__ = fn.__defaults__ 

1006 except AttributeError: 

1007 pass 

1008 setattr(into_cls, method, env[method]) 

1009 

1010 

1011def methods_equivalent(meth1, meth2): 

1012 """Return True if the two methods are the same implementation.""" 

1013 

1014 return getattr(meth1, "__func__", meth1) is getattr( 

1015 meth2, "__func__", meth2 

1016 ) 

1017 

1018 

1019def as_interface(obj, cls=None, methods=None, required=None): 

1020 """Ensure basic interface compliance for an instance or dict of callables. 

1021 

1022 Checks that ``obj`` implements public methods of ``cls`` or has members 

1023 listed in ``methods``. If ``required`` is not supplied, implementing at 

1024 least one interface method is sufficient. Methods present on ``obj`` that 

1025 are not in the interface are ignored. 

1026 

1027 If ``obj`` is a dict and ``dict`` does not meet the interface 

1028 requirements, the keys of the dictionary are inspected. Keys present in 

1029 ``obj`` that are not in the interface will raise TypeErrors. 

1030 

1031 Raises TypeError if ``obj`` does not meet the interface criteria. 

1032 

1033 In all passing cases, an object with callable members is returned. In the 

1034 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1035 an anonymous class is returned. 

1036 

1037 obj 

1038 A type, instance, or dictionary of callables. 

1039 cls 

1040 Optional, a type. All public methods of cls are considered the 

1041 interface. An ``obj`` instance of cls will always pass, ignoring 

1042 ``required``.. 

1043 methods 

1044 Optional, a sequence of method names to consider as the interface. 

1045 required 

1046 Optional, a sequence of mandatory implementations. If omitted, an 

1047 ``obj`` that provides at least one interface method is considered 

1048 sufficient. As a convenience, required may be a type, in which case 

1049 all public methods of the type are required. 

1050 

1051 """ 

1052 if not cls and not methods: 

1053 raise TypeError("a class or collection of method names are required") 

1054 

1055 if isinstance(cls, type) and isinstance(obj, cls): 

1056 return obj 

1057 

1058 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1059 implemented = set(dir(obj)) 

1060 

1061 complies = operator.ge 

1062 if isinstance(required, type): 

1063 required = interface 

1064 elif not required: 

1065 required = set() 

1066 complies = operator.gt 

1067 else: 

1068 required = set(required) 

1069 

1070 if complies(implemented.intersection(interface), required): 

1071 return obj 

1072 

1073 # No dict duck typing here. 

1074 if not isinstance(obj, dict): 

1075 qualifier = complies is operator.gt and "any of" or "all of" 

1076 raise TypeError( 

1077 "%r does not implement %s: %s" 

1078 % (obj, qualifier, ", ".join(interface)) 

1079 ) 

1080 

1081 class AnonymousInterface: 

1082 """A callable-holding shell.""" 

1083 

1084 if cls: 

1085 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1086 found = set() 

1087 

1088 for method, impl in dictlike_iteritems(obj): 

1089 if method not in interface: 

1090 raise TypeError("%r: unknown in this interface" % method) 

1091 if not callable(impl): 

1092 raise TypeError("%r=%r is not callable" % (method, impl)) 

1093 setattr(AnonymousInterface, method, staticmethod(impl)) 

1094 found.add(method) 

1095 

1096 if complies(found, required): 

1097 return AnonymousInterface 

1098 

1099 raise TypeError( 

1100 "dictionary does not contain required keys %s" 

1101 % ", ".join(required - found) 

1102 ) 

1103 

1104 

1105_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1106 

1107 

1108class generic_fn_descriptor(Generic[_T_co]): 

1109 """Descriptor which proxies a function when the attribute is not 

1110 present in dict 

1111 

1112 This superclass is organized in a particular way with "memoized" and 

1113 "non-memoized" implementation classes that are hidden from type checkers, 

1114 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1115 classes used for the same attribute. 

1116 

1117 """ 

1118 

1119 fget: Callable[..., _T_co] 

1120 __doc__: Optional[str] 

1121 __name__: str 

1122 

1123 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1124 self.fget = fget 

1125 self.__doc__ = doc or fget.__doc__ 

1126 self.__name__ = fget.__name__ 

1127 

1128 @overload 

1129 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1130 

1131 @overload 

1132 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1133 

1134 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1135 raise NotImplementedError() 

1136 

1137 if TYPE_CHECKING: 

1138 

1139 def __set__(self, instance: Any, value: Any) -> None: ... 

1140 

1141 def __delete__(self, instance: Any) -> None: ... 

1142 

1143 def _reset(self, obj: Any) -> None: 

1144 raise NotImplementedError() 

1145 

1146 @classmethod 

1147 def reset(cls, obj: Any, name: str) -> None: 

1148 raise NotImplementedError() 

1149 

1150 

1151class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1152 """a plain descriptor that proxies a function. 

1153 

1154 primary rationale is to provide a plain attribute that's 

1155 compatible with memoized_property which is also recognized as equivalent 

1156 by mypy. 

1157 

1158 """ 

1159 

1160 if not TYPE_CHECKING: 

1161 

1162 def __get__(self, obj, cls): 

1163 if obj is None: 

1164 return self 

1165 return self.fget(obj) 

1166 

1167 

1168class _memoized_property(generic_fn_descriptor[_T_co]): 

1169 """A read-only @property that is only evaluated once.""" 

1170 

1171 if not TYPE_CHECKING: 

1172 

1173 def __get__(self, obj, cls): 

1174 if obj is None: 

1175 return self 

1176 obj.__dict__[self.__name__] = result = self.fget(obj) 

1177 return result 

1178 

1179 def _reset(self, obj): 

1180 _memoized_property.reset(obj, self.__name__) 

1181 

1182 @classmethod 

1183 def reset(cls, obj, name): 

1184 obj.__dict__.pop(name, None) 

1185 

1186 

1187# despite many attempts to get Mypy to recognize an overridden descriptor 

1188# where one is memoized and the other isn't, there seems to be no reliable 

1189# way other than completely deceiving the type checker into thinking there 

1190# is just one single descriptor type everywhere. Otherwise, if a superclass 

1191# has non-memoized and subclass has memoized, that requires 

1192# "class memoized(non_memoized)". but then if a superclass has memoized and 

1193# superclass has non-memoized, the class hierarchy of the descriptors 

1194# would need to be reversed; "class non_memoized(memoized)". so there's no 

1195# way to achieve this. 

1196# additional issues, RO properties: 

1197# https://github.com/python/mypy/issues/12440 

1198if TYPE_CHECKING: 

1199 # allow memoized and non-memoized to be freely mixed by having them 

1200 # be the same class 

1201 memoized_property = generic_fn_descriptor 

1202 non_memoized_property = generic_fn_descriptor 

1203 

1204 # for read only situations, mypy only sees @property as read only. 

1205 # read only is needed when a subtype specializes the return type 

1206 # of a property, meaning assignment needs to be disallowed 

1207 ro_memoized_property = property 

1208 ro_non_memoized_property = property 

1209 

1210else: 

1211 memoized_property = ro_memoized_property = _memoized_property 

1212 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1213 

1214 

1215def memoized_instancemethod(fn: _F) -> _F: 

1216 """Decorate a method memoize its return value. 

1217 

1218 Best applied to no-arg methods: memoization is not sensitive to 

1219 argument values, and will always return the same value even when 

1220 called with different arguments. 

1221 

1222 """ 

1223 

1224 def oneshot(self, *args, **kw): 

1225 result = fn(self, *args, **kw) 

1226 

1227 def memo(*a, **kw): 

1228 return result 

1229 

1230 memo.__name__ = fn.__name__ 

1231 memo.__doc__ = fn.__doc__ 

1232 self.__dict__[fn.__name__] = memo 

1233 return result 

1234 

1235 return update_wrapper(oneshot, fn) # type: ignore 

1236 

1237 

1238class HasMemoized: 

1239 """A mixin class that maintains the names of memoized elements in a 

1240 collection for easy cache clearing, generative, etc. 

1241 

1242 """ 

1243 

1244 if not TYPE_CHECKING: 

1245 # support classes that want to have __slots__ with an explicit 

1246 # slot for __dict__. not sure if that requires base __slots__ here. 

1247 __slots__ = () 

1248 

1249 _memoized_keys: FrozenSet[str] = frozenset() 

1250 

1251 def _reset_memoizations(self) -> None: 

1252 for elem in self._memoized_keys: 

1253 self.__dict__.pop(elem, None) 

1254 

1255 def _assert_no_memoizations(self) -> None: 

1256 for elem in self._memoized_keys: 

1257 assert elem not in self.__dict__ 

1258 

1259 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1260 self.__dict__[key] = value 

1261 self._memoized_keys |= {key} 

1262 

1263 class memoized_attribute(memoized_property[_T]): 

1264 """A read-only @property that is only evaluated once. 

1265 

1266 :meta private: 

1267 

1268 """ 

1269 

1270 fget: Callable[..., _T] 

1271 __doc__: Optional[str] 

1272 __name__: str 

1273 

1274 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1275 self.fget = fget 

1276 self.__doc__ = doc or fget.__doc__ 

1277 self.__name__ = fget.__name__ 

1278 

1279 @overload 

1280 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1281 

1282 @overload 

1283 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1284 

1285 def __get__(self, obj, cls): 

1286 if obj is None: 

1287 return self 

1288 obj.__dict__[self.__name__] = result = self.fget(obj) 

1289 obj._memoized_keys |= {self.__name__} 

1290 return result 

1291 

1292 @classmethod 

1293 def memoized_instancemethod(cls, fn: _F) -> _F: 

1294 """Decorate a method memoize its return value. 

1295 

1296 :meta private: 

1297 

1298 """ 

1299 

1300 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1301 result = fn(self, *args, **kw) 

1302 

1303 def memo(*a, **kw): 

1304 return result 

1305 

1306 memo.__name__ = fn.__name__ 

1307 memo.__doc__ = fn.__doc__ 

1308 self.__dict__[fn.__name__] = memo 

1309 self._memoized_keys |= {fn.__name__} 

1310 return result 

1311 

1312 return update_wrapper(oneshot, fn) # type: ignore 

1313 

1314 

1315if TYPE_CHECKING: 

1316 HasMemoized_ro_memoized_attribute = property 

1317else: 

1318 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1319 

1320 

1321class MemoizedSlots: 

1322 """Apply memoized items to an object using a __getattr__ scheme. 

1323 

1324 This allows the functionality of memoized_property and 

1325 memoized_instancemethod to be available to a class using __slots__. 

1326 

1327 The memoized get is not threadsafe under freethreading and the 

1328 creator method may in extremely rare cases be called more than once. 

1329 

1330 """ 

1331 

1332 __slots__ = () 

1333 

1334 def _fallback_getattr(self, key): 

1335 raise AttributeError(key) 

1336 

1337 def __getattr__(self, key: str) -> Any: 

1338 if key.startswith("_memoized_attr_") or key.startswith( 

1339 "_memoized_method_" 

1340 ): 

1341 raise AttributeError(key) 

1342 # to avoid recursion errors when interacting with other __getattr__ 

1343 # schemes that refer to this one, when testing for memoized method 

1344 # look at __class__ only rather than going into __getattr__ again. 

1345 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1346 value = getattr(self, f"_memoized_attr_{key}")() 

1347 setattr(self, key, value) 

1348 return value 

1349 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1350 meth = getattr(self, f"_memoized_method_{key}") 

1351 

1352 def oneshot(*args, **kw): 

1353 result = meth(*args, **kw) 

1354 

1355 def memo(*a, **kw): 

1356 return result 

1357 

1358 memo.__name__ = meth.__name__ 

1359 memo.__doc__ = meth.__doc__ 

1360 setattr(self, key, memo) 

1361 return result 

1362 

1363 oneshot.__doc__ = meth.__doc__ 

1364 return oneshot 

1365 else: 

1366 return self._fallback_getattr(key) 

1367 

1368 

1369# from paste.deploy.converters 

1370def asbool(obj: Any) -> bool: 

1371 if isinstance(obj, str): 

1372 obj = obj.strip().lower() 

1373 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1374 return True 

1375 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1376 return False 

1377 else: 

1378 raise ValueError("String is not true/false: %r" % obj) 

1379 return bool(obj) 

1380 

1381 

1382def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1383 """Return a callable that will evaluate a string as 

1384 boolean, or one of a set of "alternate" string values. 

1385 

1386 """ 

1387 

1388 def bool_or_value(obj: str) -> Union[str, bool]: 

1389 if obj in text: 

1390 return obj 

1391 else: 

1392 return asbool(obj) 

1393 

1394 return bool_or_value 

1395 

1396 

1397def asint(value: Any) -> Optional[int]: 

1398 """Coerce to integer.""" 

1399 

1400 if value is None: 

1401 return value 

1402 return int(value) 

1403 

1404 

1405def coerce_kw_type( 

1406 kw: Dict[str, Any], 

1407 key: str, 

1408 type_: Type[Any], 

1409 flexi_bool: bool = True, 

1410 dest: Optional[Dict[str, Any]] = None, 

1411) -> None: 

1412 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1413 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1414 when coercing to boolean. 

1415 """ 

1416 

1417 if dest is None: 

1418 dest = kw 

1419 

1420 if ( 

1421 key in kw 

1422 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1423 and kw[key] is not None 

1424 ): 

1425 if type_ is bool and flexi_bool: 

1426 dest[key] = asbool(kw[key]) 

1427 else: 

1428 dest[key] = type_(kw[key]) 

1429 

1430 

1431def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1432 """Produce a tuple structure that is cacheable using the __dict__ of 

1433 obj to retrieve values 

1434 

1435 """ 

1436 names = get_cls_kwargs(cls) 

1437 return (cls,) + tuple( 

1438 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1439 ) 

1440 

1441 

1442def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1443 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1444 

1445 Uses inspect to match the named arguments of ``cls``. 

1446 

1447 """ 

1448 

1449 names = get_cls_kwargs(cls) 

1450 kw.update( 

1451 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1452 ) 

1453 return cls(*args, **kw) 

1454 

1455 

1456def counter() -> Callable[[], int]: 

1457 """Return a threadsafe counter function.""" 

1458 

1459 lock = threading.Lock() 

1460 counter = itertools.count(1) 

1461 

1462 # avoid the 2to3 "next" transformation... 

1463 def _next(): 

1464 with lock: 

1465 return next(counter) 

1466 

1467 return _next 

1468 

1469 

1470def duck_type_collection( 

1471 specimen: Any, default: Optional[Type[Any]] = None 

1472) -> Optional[Type[Any]]: 

1473 """Given an instance or class, guess if it is or is acting as one of 

1474 the basic collection types: list, set and dict. If the __emulates__ 

1475 property is present, return that preferentially. 

1476 """ 

1477 

1478 if hasattr(specimen, "__emulates__"): 

1479 # canonicalize set vs sets.Set to a standard: the builtin set 

1480 if specimen.__emulates__ is not None and issubclass( 

1481 specimen.__emulates__, set 

1482 ): 

1483 return set 

1484 else: 

1485 return specimen.__emulates__ # type: ignore 

1486 

1487 isa = issubclass if isinstance(specimen, type) else isinstance 

1488 if isa(specimen, list): 

1489 return list 

1490 elif isa(specimen, set): 

1491 return set 

1492 elif isa(specimen, dict): 

1493 return dict 

1494 

1495 if hasattr(specimen, "append"): 

1496 return list 

1497 elif hasattr(specimen, "add"): 

1498 return set 

1499 elif hasattr(specimen, "set"): 

1500 return dict 

1501 else: 

1502 return default 

1503 

1504 

1505def assert_arg_type( 

1506 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1507) -> Any: 

1508 if isinstance(arg, argtype): 

1509 return arg 

1510 else: 

1511 if isinstance(argtype, tuple): 

1512 raise exc.ArgumentError( 

1513 "Argument '%s' is expected to be one of type %s, got '%s'" 

1514 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1515 ) 

1516 else: 

1517 raise exc.ArgumentError( 

1518 "Argument '%s' is expected to be of type '%s', got '%s'" 

1519 % (name, argtype, type(arg)) 

1520 ) 

1521 

1522 

1523def dictlike_iteritems(dictlike): 

1524 """Return a (key, value) iterator for almost any dict-like object.""" 

1525 

1526 if hasattr(dictlike, "items"): 

1527 return list(dictlike.items()) 

1528 

1529 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1530 if getter is None: 

1531 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1532 

1533 if hasattr(dictlike, "iterkeys"): 

1534 

1535 def iterator(): 

1536 for key in dictlike.iterkeys(): 

1537 assert getter is not None 

1538 yield key, getter(key) 

1539 

1540 return iterator() 

1541 elif hasattr(dictlike, "keys"): 

1542 return iter((key, getter(key)) for key in dictlike.keys()) 

1543 else: 

1544 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1545 

1546 

1547class classproperty(property): 

1548 """A decorator that behaves like @property except that operates 

1549 on classes rather than instances. 

1550 

1551 The decorator is currently special when using the declarative 

1552 module, but note that the 

1553 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1554 decorator should be used for this purpose with declarative. 

1555 

1556 """ 

1557 

1558 fget: Callable[[Any], Any] 

1559 

1560 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1561 super().__init__(fget, *arg, **kw) 

1562 self.__doc__ = fget.__doc__ 

1563 

1564 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1565 return self.fget(cls) 

1566 

1567 

1568class hybridproperty(Generic[_T]): 

1569 def __init__(self, func: Callable[..., _T]): 

1570 self.func = func 

1571 self.clslevel = func 

1572 

1573 def __get__(self, instance: Any, owner: Any) -> _T: 

1574 if instance is None: 

1575 clsval = self.clslevel(owner) 

1576 return clsval 

1577 else: 

1578 return self.func(instance) 

1579 

1580 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1581 self.clslevel = func 

1582 return self 

1583 

1584 

1585class rw_hybridproperty(Generic[_T]): 

1586 def __init__(self, func: Callable[..., _T]): 

1587 self.func = func 

1588 self.clslevel = func 

1589 self.setfn: Optional[Callable[..., Any]] = None 

1590 

1591 def __get__(self, instance: Any, owner: Any) -> _T: 

1592 if instance is None: 

1593 clsval = self.clslevel(owner) 

1594 return clsval 

1595 else: 

1596 return self.func(instance) 

1597 

1598 def __set__(self, instance: Any, value: Any) -> None: 

1599 assert self.setfn is not None 

1600 self.setfn(instance, value) 

1601 

1602 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1603 self.setfn = func 

1604 return self 

1605 

1606 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1607 self.clslevel = func 

1608 return self 

1609 

1610 

1611class hybridmethod(Generic[_T]): 

1612 """Decorate a function as cls- or instance- level.""" 

1613 

1614 def __init__(self, func: Callable[..., _T]): 

1615 self.func = self.__func__ = func 

1616 self.clslevel = func 

1617 

1618 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1619 if instance is None: 

1620 return self.clslevel.__get__(owner, owner.__class__) # type:ignore 

1621 else: 

1622 return self.func.__get__(instance, owner) # type:ignore 

1623 

1624 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1625 self.clslevel = func 

1626 return self 

1627 

1628 

1629class symbol(int): 

1630 """A constant symbol. 

1631 

1632 >>> symbol("foo") is symbol("foo") 

1633 True 

1634 >>> symbol("foo") 

1635 <symbol 'foo> 

1636 

1637 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1638 advantage of symbol() is its repr(). They are also singletons. 

1639 

1640 Repeated calls of symbol('name') will all return the same instance. 

1641 

1642 """ 

1643 

1644 name: str 

1645 

1646 symbols: Dict[str, symbol] = {} 

1647 _lock = threading.Lock() 

1648 

1649 def __new__( 

1650 cls, 

1651 name: str, 

1652 doc: Optional[str] = None, 

1653 canonical: Optional[int] = None, 

1654 ) -> symbol: 

1655 with cls._lock: 

1656 sym = cls.symbols.get(name) 

1657 if sym is None: 

1658 assert isinstance(name, str) 

1659 if canonical is None: 

1660 canonical = hash(name) 

1661 sym = int.__new__(symbol, canonical) 

1662 sym.name = name 

1663 if doc: 

1664 sym.__doc__ = doc 

1665 

1666 # NOTE: we should ultimately get rid of this global thing, 

1667 # however, currently it is to support pickling. The best 

1668 # change would be when we are on py3.11 at a minimum, we 

1669 # switch to stdlib enum.IntFlag. 

1670 cls.symbols[name] = sym 

1671 else: 

1672 if canonical and canonical != sym: 

1673 raise TypeError( 

1674 f"Can't replace canonical symbol for {name!r} " 

1675 f"with new int value {canonical}" 

1676 ) 

1677 return sym 

1678 

1679 def __reduce__(self): 

1680 return symbol, (self.name, "x", int(self)) 

1681 

1682 def __str__(self): 

1683 return repr(self) 

1684 

1685 def __repr__(self): 

1686 return f"symbol({self.name!r})" 

1687 

1688 

1689class _IntFlagMeta(type): 

1690 def __init__( 

1691 cls, 

1692 classname: str, 

1693 bases: Tuple[Type[Any], ...], 

1694 dict_: Dict[str, Any], 

1695 **kw: Any, 

1696 ) -> None: 

1697 items: List[symbol] 

1698 cls._items = items = [] 

1699 for k, v in dict_.items(): 

1700 if re.match(r"^__.*__$", k): 

1701 continue 

1702 if isinstance(v, int): 

1703 sym = symbol(k, canonical=v) 

1704 elif not k.startswith("_"): 

1705 raise TypeError("Expected integer values for IntFlag") 

1706 else: 

1707 continue 

1708 setattr(cls, k, sym) 

1709 items.append(sym) 

1710 

1711 cls.__members__ = _collections.immutabledict( 

1712 {sym.name: sym for sym in items} 

1713 ) 

1714 

1715 def __iter__(self) -> Iterator[symbol]: 

1716 raise NotImplementedError( 

1717 "iter not implemented to ensure compatibility with " 

1718 "Python 3.11 IntFlag. Please use __members__. See " 

1719 "https://github.com/python/cpython/issues/99304" 

1720 ) 

1721 

1722 

1723class _FastIntFlag(metaclass=_IntFlagMeta): 

1724 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1725 operations. 

1726 

1727 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1728 and ``_FastIntFlag`` otherwise. 

1729 

1730 """ 

1731 

1732 

1733if TYPE_CHECKING: 

1734 from enum import IntFlag 

1735 

1736 FastIntFlag = IntFlag 

1737else: 

1738 FastIntFlag = _FastIntFlag 

1739 

1740 

1741_E = TypeVar("_E", bound=enum.Enum) 

1742 

1743 

1744def parse_user_argument_for_enum( 

1745 arg: Any, 

1746 choices: Dict[_E, List[Any]], 

1747 name: str, 

1748 resolve_symbol_names: bool = False, 

1749) -> Optional[_E]: 

1750 """Given a user parameter, parse the parameter into a chosen value 

1751 from a list of choice objects, typically Enum values. 

1752 

1753 The user argument can be a string name that matches the name of a 

1754 symbol, or the symbol object itself, or any number of alternate choices 

1755 such as True/False/ None etc. 

1756 

1757 :param arg: the user argument. 

1758 :param choices: dictionary of enum values to lists of possible 

1759 entries for each. 

1760 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1761 that is raised if the parameter doesn't match any available argument. 

1762 

1763 """ 

1764 for enum_value, choice in choices.items(): 

1765 if arg is enum_value: 

1766 return enum_value 

1767 elif resolve_symbol_names and arg == enum_value.name: 

1768 return enum_value 

1769 elif arg in choice: 

1770 return enum_value 

1771 

1772 if arg is None: 

1773 return None 

1774 

1775 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1776 

1777 

1778_creation_order = 1 

1779 

1780 

1781def set_creation_order(instance: Any) -> None: 

1782 """Assign a '_creation_order' sequence to the given instance. 

1783 

1784 This allows multiple instances to be sorted in order of creation 

1785 (typically within a single thread; the counter is not particularly 

1786 threadsafe). 

1787 

1788 """ 

1789 global _creation_order 

1790 instance._creation_order = _creation_order 

1791 _creation_order += 1 

1792 

1793 

1794def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1795 """executes the given function, catches all exceptions and converts to 

1796 a warning. 

1797 

1798 """ 

1799 try: 

1800 return func(*args, **kwargs) 

1801 except Exception: 

1802 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1803 

1804 

1805def ellipses_string(value, len_=25): 

1806 try: 

1807 if len(value) > len_: 

1808 return "%s..." % value[0:len_] 

1809 else: 

1810 return value 

1811 except TypeError: 

1812 return value 

1813 

1814 

1815class _hash_limit_string(str): 

1816 """A string subclass that can only be hashed on a maximum amount 

1817 of unique values. 

1818 

1819 This is used for warnings so that we can send out parameterized warnings 

1820 without the __warningregistry__ of the module, or the non-overridable 

1821 "once" registry within warnings.py, overloading memory, 

1822 

1823 

1824 """ 

1825 

1826 _hash: int 

1827 

1828 def __new__( 

1829 cls, value: str, num: int, args: Sequence[Any] 

1830 ) -> _hash_limit_string: 

1831 interpolated = (value % args) + ( 

1832 " (this warning may be suppressed after %d occurrences)" % num 

1833 ) 

1834 self = super().__new__(cls, interpolated) 

1835 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1836 return self 

1837 

1838 def __hash__(self) -> int: 

1839 return self._hash 

1840 

1841 def __eq__(self, other: Any) -> bool: 

1842 return hash(self) == hash(other) 

1843 

1844 

1845def warn(msg: str, code: Optional[str] = None) -> None: 

1846 """Issue a warning. 

1847 

1848 If msg is a string, :class:`.exc.SAWarning` is used as 

1849 the category. 

1850 

1851 """ 

1852 if code: 

1853 _warnings_warn(exc.SAWarning(msg, code=code)) 

1854 else: 

1855 _warnings_warn(msg, exc.SAWarning) 

1856 

1857 

1858def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1859 """Issue a warning with a parameterized string, limiting the number 

1860 of registrations. 

1861 

1862 """ 

1863 if args: 

1864 msg = _hash_limit_string(msg, 10, args) 

1865 _warnings_warn(msg, exc.SAWarning) 

1866 

1867 

1868_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1869 

1870 

1871def tag_method_for_warnings( 

1872 message: str, category: Type[Warning] 

1873) -> Callable[[_F], _F]: 

1874 def go(fn): 

1875 _warning_tags[fn.__code__] = (message, category) 

1876 return fn 

1877 

1878 return go 

1879 

1880 

1881_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1882 

1883 

1884def _warnings_warn( 

1885 message: Union[str, Warning], 

1886 category: Optional[Type[Warning]] = None, 

1887 stacklevel: int = 2, 

1888) -> None: 

1889 

1890 if category is None and isinstance(message, Warning): 

1891 category = type(message) 

1892 

1893 # adjust the given stacklevel to be outside of SQLAlchemy 

1894 try: 

1895 frame = sys._getframe(stacklevel) 

1896 except ValueError: 

1897 # being called from less than 3 (or given) stacklevels, weird, 

1898 # but don't crash 

1899 stacklevel = 0 

1900 except: 

1901 # _getframe() doesn't work, weird interpreter issue, weird, 

1902 # ok, but don't crash 

1903 stacklevel = 0 

1904 else: 

1905 stacklevel_found = warning_tag_found = False 

1906 while frame is not None: 

1907 # using __name__ here requires that we have __name__ in the 

1908 # __globals__ of the decorated string functions we make also. 

1909 # we generate this using {"__name__": fn.__module__} 

1910 if not stacklevel_found and not re.match( 

1911 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1912 ): 

1913 # stop incrementing stack level if an out-of-SQLA line 

1914 # were found. 

1915 stacklevel_found = True 

1916 

1917 # however, for the warning tag thing, we have to keep 

1918 # scanning up the whole traceback 

1919 

1920 if frame.f_code in _warning_tags: 

1921 warning_tag_found = True 

1922 (_suffix, _category) = _warning_tags[frame.f_code] 

1923 category = category or _category 

1924 message = f"{message} ({_suffix})" 

1925 

1926 frame = frame.f_back # type: ignore[assignment] 

1927 

1928 if not stacklevel_found: 

1929 stacklevel += 1 

1930 elif stacklevel_found and warning_tag_found: 

1931 break 

1932 

1933 if category is not None: 

1934 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1935 else: 

1936 warnings.warn(message, stacklevel=stacklevel + 1) 

1937 

1938 

1939def only_once( 

1940 fn: Callable[..., _T], retry_on_exception: bool 

1941) -> Callable[..., Optional[_T]]: 

1942 """Decorate the given function to be a no-op after it is called exactly 

1943 once.""" 

1944 

1945 once = [fn] 

1946 

1947 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1948 # strong reference fn so that it isn't garbage collected, 

1949 # which interferes with the event system's expectations 

1950 strong_fn = fn # noqa 

1951 if once: 

1952 once_fn = once.pop() 

1953 try: 

1954 return once_fn(*arg, **kw) 

1955 except: 

1956 if retry_on_exception: 

1957 once.insert(0, once_fn) 

1958 raise 

1959 

1960 return None 

1961 

1962 return go 

1963 

1964 

1965_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

1966_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

1967 

1968 

1969def chop_traceback( 

1970 tb: List[str], 

1971 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

1972 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

1973) -> List[str]: 

1974 """Chop extraneous lines off beginning and end of a traceback. 

1975 

1976 :param tb: 

1977 a list of traceback lines as returned by ``traceback.format_stack()`` 

1978 

1979 :param exclude_prefix: 

1980 a regular expression object matching lines to skip at beginning of 

1981 ``tb`` 

1982 

1983 :param exclude_suffix: 

1984 a regular expression object matching lines to skip at end of ``tb`` 

1985 """ 

1986 start = 0 

1987 end = len(tb) - 1 

1988 while start <= end and exclude_prefix.search(tb[start]): 

1989 start += 1 

1990 while start <= end and exclude_suffix.search(tb[end]): 

1991 end -= 1 

1992 return tb[start : end + 1] 

1993 

1994 

1995def attrsetter(attrname): 

1996 code = "def set(obj, value): obj.%s = value" % attrname 

1997 env = locals().copy() 

1998 exec(code, env) 

1999 return env["set"] 

2000 

2001 

2002_dunders = re.compile("^__.+__$") 

2003 

2004 

2005class TypingOnly: 

2006 """A mixin class that marks a class as 'typing only', meaning it has 

2007 absolutely no methods, attributes, or runtime functionality whatsoever. 

2008 

2009 """ 

2010 

2011 __slots__ = () 

2012 

2013 def __init_subclass__(cls) -> None: 

2014 if TypingOnly in cls.__bases__: 

2015 remaining = { 

2016 name for name in cls.__dict__ if not _dunders.match(name) 

2017 } 

2018 if remaining: 

2019 raise AssertionError( 

2020 f"Class {cls} directly inherits TypingOnly but has " 

2021 f"additional attributes {remaining}." 

2022 ) 

2023 super().__init_subclass__() 

2024 

2025 

2026class EnsureKWArg: 

2027 r"""Apply translation of functions to accept \**kw arguments if they 

2028 don't already. 

2029 

2030 Used to ensure cross-compatibility with third party legacy code, for things 

2031 like compiler visit methods that need to accept ``**kw`` arguments, 

2032 but may have been copied from old code that didn't accept them. 

2033 

2034 """ 

2035 

2036 ensure_kwarg: str 

2037 """a regular expression that indicates method names for which the method 

2038 should accept ``**kw`` arguments. 

2039 

2040 The class will scan for methods matching the name template and decorate 

2041 them if necessary to ensure ``**kw`` parameters are accepted. 

2042 

2043 """ 

2044 

2045 def __init_subclass__(cls) -> None: 

2046 fn_reg = cls.ensure_kwarg 

2047 clsdict = cls.__dict__ 

2048 if fn_reg: 

2049 for key in clsdict: 

2050 m = re.match(fn_reg, key) 

2051 if m: 

2052 fn = clsdict[key] 

2053 spec = compat.inspect_getfullargspec(fn) 

2054 if not spec.varkw: 

2055 wrapped = cls._wrap_w_kw(fn) 

2056 setattr(cls, key, wrapped) 

2057 super().__init_subclass__() 

2058 

2059 @classmethod 

2060 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2061 def wrap(*arg: Any, **kw: Any) -> Any: 

2062 return fn(*arg) 

2063 

2064 return update_wrapper(wrap, fn) 

2065 

2066 

2067def wrap_callable(wrapper, fn): 

2068 """Augment functools.update_wrapper() to work with objects with 

2069 a ``__call__()`` method. 

2070 

2071 :param fn: 

2072 object with __call__ method 

2073 

2074 """ 

2075 if hasattr(fn, "__name__"): 

2076 return update_wrapper(wrapper, fn) 

2077 else: 

2078 _f = wrapper 

2079 _f.__name__ = fn.__class__.__name__ 

2080 if hasattr(fn, "__module__"): 

2081 _f.__module__ = fn.__module__ 

2082 

2083 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2084 _f.__doc__ = fn.__call__.__doc__ 

2085 elif fn.__doc__: 

2086 _f.__doc__ = fn.__doc__ 

2087 

2088 return _f 

2089 

2090 

2091def quoted_token_parser(value): 

2092 """Parse a dotted identifier with accommodation for quoted names. 

2093 

2094 Includes support for SQL-style double quotes as a literal character. 

2095 

2096 E.g.:: 

2097 

2098 >>> quoted_token_parser("name") 

2099 ["name"] 

2100 >>> quoted_token_parser("schema.name") 

2101 ["schema", "name"] 

2102 >>> quoted_token_parser('"Schema"."Name"') 

2103 ['Schema', 'Name'] 

2104 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2105 ['Schema', 'Name""Foo'] 

2106 

2107 """ 

2108 

2109 if '"' not in value: 

2110 return value.split(".") 

2111 

2112 # 0 = outside of quotes 

2113 # 1 = inside of quotes 

2114 state = 0 

2115 result: List[List[str]] = [[]] 

2116 idx = 0 

2117 lv = len(value) 

2118 while idx < lv: 

2119 char = value[idx] 

2120 if char == '"': 

2121 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2122 result[-1].append('"') 

2123 idx += 1 

2124 else: 

2125 state ^= 1 

2126 elif char == "." and state == 0: 

2127 result.append([]) 

2128 else: 

2129 result[-1].append(char) 

2130 idx += 1 

2131 

2132 return ["".join(token) for token in result] 

2133 

2134 

2135def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2136 params = _collections.to_list(params) 

2137 

2138 def decorate(fn): 

2139 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2140 if doc: 

2141 doc = inject_param_text(doc, {param: text for param in params}) 

2142 fn.__doc__ = doc 

2143 return fn 

2144 

2145 return decorate 

2146 

2147 

2148def _dedent_docstring(text: str) -> str: 

2149 split_text = text.split("\n", 1) 

2150 if len(split_text) == 1: 

2151 return text 

2152 else: 

2153 firstline, remaining = split_text 

2154 if not firstline.startswith(" "): 

2155 return firstline + "\n" + textwrap.dedent(remaining) 

2156 else: 

2157 return textwrap.dedent(text) 

2158 

2159 

2160def inject_docstring_text( 

2161 given_doctext: Optional[str], injecttext: str, pos: int 

2162) -> str: 

2163 doctext: str = _dedent_docstring(given_doctext or "") 

2164 lines = doctext.split("\n") 

2165 if len(lines) == 1: 

2166 lines.append("") 

2167 injectlines = textwrap.dedent(injecttext).split("\n") 

2168 if injectlines[0]: 

2169 injectlines.insert(0, "") 

2170 

2171 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2172 blanks.insert(0, 0) 

2173 

2174 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2175 

2176 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2177 return "\n".join(lines) 

2178 

2179 

2180_param_reg = re.compile(r"(\s+):param (.+?):") 

2181 

2182 

2183def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2184 doclines = collections.deque(doctext.splitlines()) 

2185 lines = [] 

2186 

2187 # TODO: this is not working for params like ":param case_sensitive=True:" 

2188 

2189 to_inject = None 

2190 while doclines: 

2191 line = doclines.popleft() 

2192 

2193 m = _param_reg.match(line) 

2194 

2195 if to_inject is None: 

2196 if m: 

2197 param = m.group(2).lstrip("*") 

2198 if param in inject_params: 

2199 # default indent to that of :param: plus one 

2200 indent = " " * len(m.group(1)) + " " 

2201 

2202 # but if the next line has text, use that line's 

2203 # indentation 

2204 if doclines: 

2205 m2 = re.match(r"(\s+)\S", doclines[0]) 

2206 if m2: 

2207 indent = " " * len(m2.group(1)) 

2208 

2209 to_inject = indent + inject_params[param] 

2210 elif m: 

2211 lines.extend(["\n", to_inject, "\n"]) 

2212 to_inject = None 

2213 elif not line.rstrip(): 

2214 lines.extend([line, to_inject, "\n"]) 

2215 to_inject = None 

2216 elif line.endswith("::"): 

2217 # TODO: this still won't cover if the code example itself has 

2218 # blank lines in it, need to detect those via indentation. 

2219 lines.extend([line, doclines.popleft()]) 

2220 continue 

2221 lines.append(line) 

2222 

2223 return "\n".join(lines) 

2224 

2225 

2226def repr_tuple_names(names: List[str]) -> Optional[str]: 

2227 """Trims a list of strings from the middle and return a string of up to 

2228 four elements. Strings greater than 11 characters will be truncated""" 

2229 if len(names) == 0: 

2230 return None 

2231 flag = len(names) <= 4 

2232 names = names[0:4] if flag else names[0:3] + names[-1:] 

2233 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2234 if flag: 

2235 return ", ".join(res) 

2236 else: 

2237 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2238 

2239 

2240def has_compiled_ext(raise_=False): 

2241 from ._has_cython import HAS_CYEXTENSION 

2242 

2243 if HAS_CYEXTENSION: 

2244 return True 

2245 elif raise_: 

2246 raise ImportError( 

2247 "cython extensions were expected to be installed, " 

2248 "but are not present" 

2249 ) 

2250 else: 

2251 return False 

2252 

2253 

2254def load_uncompiled_module(module: _M) -> _M: 

2255 """Load the non-compied version of a module that is also 

2256 compiled with cython. 

2257 """ 

2258 full_name = module.__name__ 

2259 assert module.__spec__ 

2260 parent_name = module.__spec__.parent 

2261 assert parent_name 

2262 parent_module = sys.modules[parent_name] 

2263 assert parent_module.__spec__ 

2264 package_path = parent_module.__spec__.origin 

2265 assert package_path and package_path.endswith("__init__.py") 

2266 

2267 name = full_name.split(".")[-1] 

2268 module_path = package_path.replace("__init__.py", f"{name}.py") 

2269 

2270 py_spec = importlib.util.spec_from_file_location(full_name, module_path) 

2271 assert py_spec 

2272 py_module = importlib.util.module_from_spec(py_spec) 

2273 assert py_spec.loader 

2274 py_spec.loader.exec_module(py_module) 

2275 return cast(_M, py_module) 

2276 

2277 

2278class _Missing(enum.Enum): 

2279 Missing = enum.auto() 

2280 

2281 

2282Missing = _Missing.Missing 

2283MissingOr = Union[_T, Literal[_Missing.Missing]]