Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1050 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13from __future__ import annotations 

14 

15import collections 

16import enum 

17from functools import update_wrapper 

18import inspect 

19import itertools 

20import operator 

21import re 

22import sys 

23import textwrap 

24import threading 

25import types 

26from types import CodeType 

27from typing import Any 

28from typing import Callable 

29from typing import cast 

30from typing import Dict 

31from typing import FrozenSet 

32from typing import Generic 

33from typing import Iterator 

34from typing import List 

35from typing import Mapping 

36from typing import NoReturn 

37from typing import Optional 

38from typing import overload 

39from typing import Sequence 

40from typing import Set 

41from typing import Tuple 

42from typing import Type 

43from typing import TYPE_CHECKING 

44from typing import TypeVar 

45from typing import Union 

46import warnings 

47 

48from . import _collections 

49from . import compat 

50from ._has_cy import HAS_CYEXTENSION 

51from .typing import Literal 

52from .. import exc 

53 

54_T = TypeVar("_T") 

55_T_co = TypeVar("_T_co", covariant=True) 

56_F = TypeVar("_F", bound=Callable[..., Any]) 

57_MP = TypeVar("_MP", bound="memoized_property[Any]") 

58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

59_HP = TypeVar("_HP", bound="hybridproperty[Any]") 

60_HM = TypeVar("_HM", bound="hybridmethod[Any]") 

61 

62 

63if compat.py314: 

64 # vendor a minimal form of get_annotations per 

65 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891 

66 

67 from annotationlib import call_annotate_function # type: ignore 

68 from annotationlib import Format 

69 

70 def _get_and_call_annotate(obj, format): # noqa: A002 

71 annotate = getattr(obj, "__annotate__", None) 

72 if annotate is not None: 

73 ann = call_annotate_function(annotate, format, owner=obj) 

74 if not isinstance(ann, dict): 

75 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict") 

76 return ann 

77 return None 

78 

79 # this is ported from py3.13.0a7 

80 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501 

81 

82 def _get_dunder_annotations(obj): 

83 if isinstance(obj, type): 

84 try: 

85 ann = _BASE_GET_ANNOTATIONS(obj) 

86 except AttributeError: 

87 # For static types, the descriptor raises AttributeError. 

88 return {} 

89 else: 

90 ann = getattr(obj, "__annotations__", None) 

91 if ann is None: 

92 return {} 

93 

94 if not isinstance(ann, dict): 

95 raise ValueError( 

96 f"{obj!r}.__annotations__ is neither a dict nor None" 

97 ) 

98 return dict(ann) 

99 

100 def _vendored_get_annotations( 

101 obj: Any, *, format: Format # noqa: A002 

102 ) -> Mapping[str, Any]: 

103 """A sparse implementation of annotationlib.get_annotations()""" 

104 

105 try: 

106 ann = _get_dunder_annotations(obj) 

107 except Exception: 

108 pass 

109 else: 

110 if ann is not None: 

111 return dict(ann) 

112 

113 # But if __annotations__ threw a NameError, we try calling __annotate__ 

114 ann = _get_and_call_annotate(obj, format) 

115 if ann is None: 

116 # If that didn't work either, we have a very weird object: 

117 # evaluating 

118 # __annotations__ threw NameError and there is no __annotate__. 

119 # In that case, 

120 # we fall back to trying __annotations__ again. 

121 ann = _get_dunder_annotations(obj) 

122 

123 if ann is None: 

124 if isinstance(obj, type) or callable(obj): 

125 return {} 

126 raise TypeError(f"{obj!r} does not have annotations") 

127 

128 if not ann: 

129 return {} 

130 

131 return dict(ann) 

132 

133 def get_annotations(obj: Any) -> Mapping[str, Any]: 

134 # FORWARDREF has the effect of giving us ForwardRefs and not 

135 # actually trying to evaluate the annotations. We need this so 

136 # that the annotations act as much like 

137 # "from __future__ import annotations" as possible, which is going 

138 # away in future python as a separate mode 

139 return _vendored_get_annotations(obj, format=Format.FORWARDREF) 

140 

141elif compat.py310: 

142 

143 def get_annotations(obj: Any) -> Mapping[str, Any]: 

144 return inspect.get_annotations(obj) 

145 

146else: 

147 

148 def get_annotations(obj: Any) -> Mapping[str, Any]: 

149 # it's been observed that cls.__annotations__ can be non present. 

150 # it's not clear what causes this, running under tox py37/38 it 

151 # happens, running straight pytest it doesnt 

152 

153 # https://docs.python.org/3/howto/annotations.html#annotations-howto 

154 if isinstance(obj, type): 

155 ann = obj.__dict__.get("__annotations__", None) 

156 else: 

157 ann = getattr(obj, "__annotations__", None) 

158 

159 if ann is None: 

160 return _collections.EMPTY_DICT 

161 else: 

162 return cast("Mapping[str, Any]", ann) 

163 

164 

165def md5_hex(x: Any) -> str: 

166 x = x.encode("utf-8") 

167 m = compat.md5_not_for_security() 

168 m.update(x) 

169 return cast(str, m.hexdigest()) 

170 

171 

172class safe_reraise: 

173 """Reraise an exception after invoking some 

174 handler code. 

175 

176 Stores the existing exception info before 

177 invoking so that it is maintained across a potential 

178 coroutine context switch. 

179 

180 e.g.:: 

181 

182 try: 

183 sess.commit() 

184 except: 

185 with safe_reraise(): 

186 sess.rollback() 

187 

188 TODO: we should at some point evaluate current behaviors in this regard 

189 based on current greenlet, gevent/eventlet implementations in Python 3, and 

190 also see the degree to which our own asyncio (based on greenlet also) is 

191 impacted by this. .rollback() will cause IO / context switch to occur in 

192 all these scenarios; what happens to the exception context from an 

193 "except:" block if we don't explicitly store it? Original issue was #2703. 

194 

195 """ 

196 

197 __slots__ = ("_exc_info",) 

198 

199 _exc_info: Union[ 

200 None, 

201 Tuple[ 

202 Type[BaseException], 

203 BaseException, 

204 types.TracebackType, 

205 ], 

206 Tuple[None, None, None], 

207 ] 

208 

209 def __enter__(self) -> None: 

210 self._exc_info = sys.exc_info() 

211 

212 def __exit__( 

213 self, 

214 type_: Optional[Type[BaseException]], 

215 value: Optional[BaseException], 

216 traceback: Optional[types.TracebackType], 

217 ) -> NoReturn: 

218 assert self._exc_info is not None 

219 # see #2703 for notes 

220 if type_ is None: 

221 exc_type, exc_value, exc_tb = self._exc_info 

222 assert exc_value is not None 

223 self._exc_info = None # remove potential circular references 

224 raise exc_value.with_traceback(exc_tb) 

225 else: 

226 self._exc_info = None # remove potential circular references 

227 assert value is not None 

228 raise value.with_traceback(traceback) 

229 

230 

231def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

232 seen: Set[Any] = set() 

233 

234 stack = [cls] 

235 while stack: 

236 cls = stack.pop() 

237 if cls in seen: 

238 continue 

239 else: 

240 seen.add(cls) 

241 stack.extend(cls.__subclasses__()) 

242 yield cls 

243 

244 

245def string_or_unprintable(element: Any) -> str: 

246 if isinstance(element, str): 

247 return element 

248 else: 

249 try: 

250 return str(element) 

251 except Exception: 

252 return "unprintable element %r" % element 

253 

254 

255def clsname_as_plain_name( 

256 cls: Type[Any], use_name: Optional[str] = None 

257) -> str: 

258 name = use_name or cls.__name__ 

259 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

260 

261 

262def method_is_overridden( 

263 instance_or_cls: Union[Type[Any], object], 

264 against_method: Callable[..., Any], 

265) -> bool: 

266 """Return True if the two class methods don't match.""" 

267 

268 if not isinstance(instance_or_cls, type): 

269 current_cls = instance_or_cls.__class__ 

270 else: 

271 current_cls = instance_or_cls 

272 

273 method_name = against_method.__name__ 

274 

275 current_method: types.MethodType = getattr(current_cls, method_name) 

276 

277 return current_method != against_method 

278 

279 

280def decode_slice(slc: slice) -> Tuple[Any, ...]: 

281 """decode a slice object as sent to __getitem__. 

282 

283 takes into account the 2.5 __index__() method, basically. 

284 

285 """ 

286 ret: List[Any] = [] 

287 for x in slc.start, slc.stop, slc.step: 

288 if hasattr(x, "__index__"): 

289 x = x.__index__() 

290 ret.append(x) 

291 return tuple(ret) 

292 

293 

294def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

295 used_set = set(used) 

296 for base in bases: 

297 pool = itertools.chain( 

298 (base,), 

299 map(lambda i: base + str(i), range(1000)), 

300 ) 

301 for sym in pool: 

302 if sym not in used_set: 

303 used_set.add(sym) 

304 yield sym 

305 break 

306 else: 

307 raise NameError("exhausted namespace for symbol base %s" % base) 

308 

309 

310def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

311 """Call the given function given each nonzero bit from n.""" 

312 

313 while n: 

314 b = n & (~n + 1) 

315 yield fn(b) 

316 n ^= b 

317 

318 

319_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

320 

321# this seems to be in flux in recent mypy versions 

322 

323 

324def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

325 """A signature-matching decorator factory.""" 

326 

327 def decorate(fn: _Fn) -> _Fn: 

328 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

329 raise Exception("not a decoratable function") 

330 

331 # Python 3.14 defer creating __annotations__ until its used. 

332 # We do not want to create __annotations__ now. 

333 annofunc = getattr(fn, "__annotate__", None) 

334 if annofunc is not None: 

335 fn.__annotate__ = None # type: ignore[union-attr] 

336 try: 

337 spec = compat.inspect_getfullargspec(fn) 

338 finally: 

339 fn.__annotate__ = annofunc # type: ignore[union-attr] 

340 else: 

341 spec = compat.inspect_getfullargspec(fn) 

342 

343 # Do not generate code for annotations. 

344 # update_wrapper() copies the annotation from fn to decorated. 

345 # We use dummy defaults for code generation to avoid having 

346 # copy of large globals for compiling. 

347 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

348 empty_defaults = (None,) * len(spec.defaults or ()) 

349 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

350 spec = spec._replace( 

351 annotations={}, 

352 defaults=empty_defaults, 

353 kwonlydefaults=empty_kwdefaults, 

354 ) 

355 

356 names = ( 

357 tuple(cast("Tuple[str, ...]", spec[0])) 

358 + cast("Tuple[str, ...]", spec[1:3]) 

359 + (fn.__name__,) 

360 ) 

361 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

362 

363 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

364 metadata.update(format_argspec_plus(spec, grouped=False)) 

365 metadata["name"] = fn.__name__ 

366 

367 if inspect.iscoroutinefunction(fn): 

368 metadata["prefix"] = "async " 

369 metadata["target_prefix"] = "await " 

370 else: 

371 metadata["prefix"] = "" 

372 metadata["target_prefix"] = "" 

373 

374 # look for __ positional arguments. This is a convention in 

375 # SQLAlchemy that arguments should be passed positionally 

376 # rather than as keyword 

377 # arguments. note that apply_pos doesn't currently work in all cases 

378 # such as when a kw-only indicator "*" is present, which is why 

379 # we limit the use of this to just that case we can detect. As we add 

380 # more kinds of methods that use @decorator, things may have to 

381 # be further improved in this area 

382 if "__" in repr(spec[0]): 

383 code = ( 

384 """\ 

385%(prefix)sdef %(name)s%(grouped_args)s: 

386 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

387""" 

388 % metadata 

389 ) 

390 else: 

391 code = ( 

392 """\ 

393%(prefix)sdef %(name)s%(grouped_args)s: 

394 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

395""" 

396 % metadata 

397 ) 

398 

399 env: Dict[str, Any] = { 

400 targ_name: target, 

401 fn_name: fn, 

402 "__name__": fn.__module__, 

403 } 

404 

405 decorated = cast( 

406 types.FunctionType, 

407 _exec_code_in_env(code, env, fn.__name__), 

408 ) 

409 decorated.__defaults__ = fn.__defaults__ 

410 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

411 return update_wrapper(decorated, fn) # type: ignore[return-value] 

412 

413 return update_wrapper(decorate, target) # type: ignore[return-value] 

414 

415 

416def _exec_code_in_env( 

417 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

418) -> Callable[..., Any]: 

419 exec(code, env) 

420 return env[fn_name] # type: ignore[no-any-return] 

421 

422 

423_PF = TypeVar("_PF") 

424_TE = TypeVar("_TE") 

425 

426 

427class PluginLoader: 

428 def __init__( 

429 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

430 ): 

431 self.group = group 

432 self.impls: Dict[str, Any] = {} 

433 self.auto_fn = auto_fn 

434 

435 def clear(self): 

436 self.impls.clear() 

437 

438 def load(self, name: str) -> Any: 

439 if name in self.impls: 

440 return self.impls[name]() 

441 

442 if self.auto_fn: 

443 loader = self.auto_fn(name) 

444 if loader: 

445 self.impls[name] = loader 

446 return loader() 

447 

448 for impl in compat.importlib_metadata_get(self.group): 

449 if impl.name == name: 

450 self.impls[name] = impl.load 

451 return impl.load() 

452 

453 raise exc.NoSuchModuleError( 

454 "Can't load plugin: %s:%s" % (self.group, name) 

455 ) 

456 

457 def register(self, name: str, modulepath: str, objname: str) -> None: 

458 def load(): 

459 mod = __import__(modulepath) 

460 for token in modulepath.split(".")[1:]: 

461 mod = getattr(mod, token) 

462 return getattr(mod, objname) 

463 

464 self.impls[name] = load 

465 

466 def deregister(self, name: str) -> None: 

467 del self.impls[name] 

468 

469 

470def _inspect_func_args(fn): 

471 try: 

472 co_varkeywords = inspect.CO_VARKEYWORDS 

473 except AttributeError: 

474 # https://docs.python.org/3/library/inspect.html 

475 # The flags are specific to CPython, and may not be defined in other 

476 # Python implementations. Furthermore, the flags are an implementation 

477 # detail, and can be removed or deprecated in future Python releases. 

478 spec = compat.inspect_getfullargspec(fn) 

479 return spec[0], bool(spec[2]) 

480 else: 

481 # use fn.__code__ plus flags to reduce method call overhead 

482 co = fn.__code__ 

483 nargs = co.co_argcount 

484 return ( 

485 list(co.co_varnames[:nargs]), 

486 bool(co.co_flags & co_varkeywords), 

487 ) 

488 

489 

490@overload 

491def get_cls_kwargs( 

492 cls: type, 

493 *, 

494 _set: Optional[Set[str]] = None, 

495 raiseerr: Literal[True] = ..., 

496) -> Set[str]: ... 

497 

498 

499@overload 

500def get_cls_kwargs( 

501 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

502) -> Optional[Set[str]]: ... 

503 

504 

505def get_cls_kwargs( 

506 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

507) -> Optional[Set[str]]: 

508 r"""Return the full set of inherited kwargs for the given `cls`. 

509 

510 Probes a class's __init__ method, collecting all named arguments. If the 

511 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

512 to pass along unrecognized keywords to its base classes, and the 

513 collection process is repeated recursively on each of the bases. 

514 

515 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

516 as this is used within the Core typing system to create copies of type 

517 objects which is a performance-sensitive operation. 

518 

519 No anonymous tuple arguments please ! 

520 

521 """ 

522 toplevel = _set is None 

523 if toplevel: 

524 _set = set() 

525 assert _set is not None 

526 

527 ctr = cls.__dict__.get("__init__", False) 

528 

529 has_init = ( 

530 ctr 

531 and isinstance(ctr, types.FunctionType) 

532 and isinstance(ctr.__code__, types.CodeType) 

533 ) 

534 

535 if has_init: 

536 names, has_kw = _inspect_func_args(ctr) 

537 _set.update(names) 

538 

539 if not has_kw and not toplevel: 

540 if raiseerr: 

541 raise TypeError( 

542 f"given cls {cls} doesn't have an __init__ method" 

543 ) 

544 else: 

545 return None 

546 else: 

547 has_kw = False 

548 

549 if not has_init or has_kw: 

550 for c in cls.__bases__: 

551 if get_cls_kwargs(c, _set=_set) is None: 

552 break 

553 

554 _set.discard("self") 

555 return _set 

556 

557 

558def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

559 """Return the set of legal kwargs for the given `func`. 

560 

561 Uses getargspec so is safe to call for methods, functions, 

562 etc. 

563 

564 """ 

565 

566 return compat.inspect_getfullargspec(func)[0] 

567 

568 

569def get_callable_argspec( 

570 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

571) -> compat.FullArgSpec: 

572 """Return the argument signature for any callable. 

573 

574 All pure-Python callables are accepted, including 

575 functions, methods, classes, objects with __call__; 

576 builtins and other edge cases like functools.partial() objects 

577 raise a TypeError. 

578 

579 """ 

580 if inspect.isbuiltin(fn): 

581 raise TypeError("Can't inspect builtin: %s" % fn) 

582 elif inspect.isfunction(fn): 

583 if _is_init and no_self: 

584 spec = compat.inspect_getfullargspec(fn) 

585 return compat.FullArgSpec( 

586 spec.args[1:], 

587 spec.varargs, 

588 spec.varkw, 

589 spec.defaults, 

590 spec.kwonlyargs, 

591 spec.kwonlydefaults, 

592 spec.annotations, 

593 ) 

594 else: 

595 return compat.inspect_getfullargspec(fn) 

596 elif inspect.ismethod(fn): 

597 if no_self and (_is_init or fn.__self__): 

598 spec = compat.inspect_getfullargspec(fn.__func__) 

599 return compat.FullArgSpec( 

600 spec.args[1:], 

601 spec.varargs, 

602 spec.varkw, 

603 spec.defaults, 

604 spec.kwonlyargs, 

605 spec.kwonlydefaults, 

606 spec.annotations, 

607 ) 

608 else: 

609 return compat.inspect_getfullargspec(fn.__func__) 

610 elif inspect.isclass(fn): 

611 return get_callable_argspec( 

612 fn.__init__, no_self=no_self, _is_init=True 

613 ) 

614 elif hasattr(fn, "__func__"): 

615 return compat.inspect_getfullargspec(fn.__func__) 

616 elif hasattr(fn, "__call__"): 

617 if inspect.ismethod(fn.__call__): 

618 return get_callable_argspec(fn.__call__, no_self=no_self) 

619 else: 

620 raise TypeError("Can't inspect callable: %s" % fn) 

621 else: 

622 raise TypeError("Can't inspect callable: %s" % fn) 

623 

624 

625def format_argspec_plus( 

626 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

627) -> Dict[str, Optional[str]]: 

628 """Returns a dictionary of formatted, introspected function arguments. 

629 

630 A enhanced variant of inspect.formatargspec to support code generation. 

631 

632 fn 

633 An inspectable callable or tuple of inspect getargspec() results. 

634 grouped 

635 Defaults to True; include (parens, around, argument) lists 

636 

637 Returns: 

638 

639 args 

640 Full inspect.formatargspec for fn 

641 self_arg 

642 The name of the first positional argument, varargs[0], or None 

643 if the function defines no positional arguments. 

644 apply_pos 

645 args, re-written in calling rather than receiving syntax. Arguments are 

646 passed positionally. 

647 apply_kw 

648 Like apply_pos, except keyword-ish args are passed as keywords. 

649 apply_pos_proxied 

650 Like apply_pos but omits the self/cls argument 

651 

652 Example:: 

653 

654 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

655 {'grouped_args': '(self, a, b, c=3, **d)', 

656 'self_arg': 'self', 

657 'apply_kw': '(self, a, b, c=c, **d)', 

658 'apply_pos': '(self, a, b, c, **d)'} 

659 

660 """ 

661 if callable(fn): 

662 spec = compat.inspect_getfullargspec(fn) 

663 else: 

664 spec = fn 

665 

666 args = compat.inspect_formatargspec(*spec) 

667 

668 apply_pos = compat.inspect_formatargspec( 

669 spec[0], spec[1], spec[2], None, spec[4] 

670 ) 

671 

672 if spec[0]: 

673 self_arg = spec[0][0] 

674 

675 apply_pos_proxied = compat.inspect_formatargspec( 

676 spec[0][1:], spec[1], spec[2], None, spec[4] 

677 ) 

678 

679 elif spec[1]: 

680 # I'm not sure what this is 

681 self_arg = "%s[0]" % spec[1] 

682 

683 apply_pos_proxied = apply_pos 

684 else: 

685 self_arg = None 

686 apply_pos_proxied = apply_pos 

687 

688 num_defaults = 0 

689 if spec[3]: 

690 num_defaults += len(cast(Tuple[Any], spec[3])) 

691 if spec[4]: 

692 num_defaults += len(spec[4]) 

693 

694 name_args = spec[0] + spec[4] 

695 

696 defaulted_vals: Union[List[str], Tuple[()]] 

697 

698 if num_defaults: 

699 defaulted_vals = name_args[0 - num_defaults :] 

700 else: 

701 defaulted_vals = () 

702 

703 apply_kw = compat.inspect_formatargspec( 

704 name_args, 

705 spec[1], 

706 spec[2], 

707 defaulted_vals, 

708 formatvalue=lambda x: "=" + str(x), 

709 ) 

710 

711 if spec[0]: 

712 apply_kw_proxied = compat.inspect_formatargspec( 

713 name_args[1:], 

714 spec[1], 

715 spec[2], 

716 defaulted_vals, 

717 formatvalue=lambda x: "=" + str(x), 

718 ) 

719 else: 

720 apply_kw_proxied = apply_kw 

721 

722 if grouped: 

723 return dict( 

724 grouped_args=args, 

725 self_arg=self_arg, 

726 apply_pos=apply_pos, 

727 apply_kw=apply_kw, 

728 apply_pos_proxied=apply_pos_proxied, 

729 apply_kw_proxied=apply_kw_proxied, 

730 ) 

731 else: 

732 return dict( 

733 grouped_args=args, 

734 self_arg=self_arg, 

735 apply_pos=apply_pos[1:-1], 

736 apply_kw=apply_kw[1:-1], 

737 apply_pos_proxied=apply_pos_proxied[1:-1], 

738 apply_kw_proxied=apply_kw_proxied[1:-1], 

739 ) 

740 

741 

742def format_argspec_init(method, grouped=True): 

743 """format_argspec_plus with considerations for typical __init__ methods 

744 

745 Wraps format_argspec_plus with error handling strategies for typical 

746 __init__ cases: 

747 

748 .. sourcecode:: text 

749 

750 object.__init__ -> (self) 

751 other unreflectable (usually C) -> (self, *args, **kwargs) 

752 

753 """ 

754 if method is object.__init__: 

755 grouped_args = "(self)" 

756 args = "(self)" if grouped else "self" 

757 proxied = "()" if grouped else "" 

758 else: 

759 try: 

760 return format_argspec_plus(method, grouped=grouped) 

761 except TypeError: 

762 grouped_args = "(self, *args, **kwargs)" 

763 args = grouped_args if grouped else "self, *args, **kwargs" 

764 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

765 return dict( 

766 self_arg="self", 

767 grouped_args=grouped_args, 

768 apply_pos=args, 

769 apply_kw=args, 

770 apply_pos_proxied=proxied, 

771 apply_kw_proxied=proxied, 

772 ) 

773 

774 

775def create_proxy_methods( 

776 target_cls: Type[Any], 

777 target_cls_sphinx_name: str, 

778 proxy_cls_sphinx_name: str, 

779 classmethods: Sequence[str] = (), 

780 methods: Sequence[str] = (), 

781 attributes: Sequence[str] = (), 

782 use_intermediate_variable: Sequence[str] = (), 

783) -> Callable[[_T], _T]: 

784 """A class decorator indicating attributes should refer to a proxy 

785 class. 

786 

787 This decorator is now a "marker" that does nothing at runtime. Instead, 

788 it is consumed by the tools/generate_proxy_methods.py script to 

789 statically generate proxy methods and attributes that are fully 

790 recognized by typing tools such as mypy. 

791 

792 """ 

793 

794 def decorate(cls): 

795 return cls 

796 

797 return decorate 

798 

799 

800def getargspec_init(method): 

801 """inspect.getargspec with considerations for typical __init__ methods 

802 

803 Wraps inspect.getargspec with error handling for typical __init__ cases: 

804 

805 .. sourcecode:: text 

806 

807 object.__init__ -> (self) 

808 other unreflectable (usually C) -> (self, *args, **kwargs) 

809 

810 """ 

811 try: 

812 return compat.inspect_getfullargspec(method) 

813 except TypeError: 

814 if method is object.__init__: 

815 return (["self"], None, None, None) 

816 else: 

817 return (["self"], "args", "kwargs", None) 

818 

819 

820def unbound_method_to_callable(func_or_cls): 

821 """Adjust the incoming callable such that a 'self' argument is not 

822 required. 

823 

824 """ 

825 

826 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

827 return func_or_cls.__func__ 

828 else: 

829 return func_or_cls 

830 

831 

832def generic_repr( 

833 obj: Any, 

834 additional_kw: Sequence[Tuple[str, Any]] = (), 

835 to_inspect: Optional[Union[object, List[object]]] = None, 

836 omit_kwarg: Sequence[str] = (), 

837) -> str: 

838 """Produce a __repr__() based on direct association of the __init__() 

839 specification vs. same-named attributes present. 

840 

841 """ 

842 if to_inspect is None: 

843 to_inspect = [obj] 

844 else: 

845 to_inspect = _collections.to_list(to_inspect) 

846 

847 missing = object() 

848 

849 pos_args = [] 

850 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict() 

851 vargs = None 

852 for i, insp in enumerate(to_inspect): 

853 try: 

854 spec = compat.inspect_getfullargspec(insp.__init__) 

855 except TypeError: 

856 continue 

857 else: 

858 default_len = len(spec.defaults) if spec.defaults else 0 

859 if i == 0: 

860 if spec.varargs: 

861 vargs = spec.varargs 

862 if default_len: 

863 pos_args.extend(spec.args[1:-default_len]) 

864 else: 

865 pos_args.extend(spec.args[1:]) 

866 else: 

867 kw_args.update( 

868 [(arg, missing) for arg in spec.args[1:-default_len]] 

869 ) 

870 

871 if default_len: 

872 assert spec.defaults 

873 kw_args.update( 

874 [ 

875 (arg, default) 

876 for arg, default in zip( 

877 spec.args[-default_len:], spec.defaults 

878 ) 

879 ] 

880 ) 

881 output: List[str] = [] 

882 

883 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

884 

885 if vargs is not None and hasattr(obj, vargs): 

886 output.extend([repr(val) for val in getattr(obj, vargs)]) 

887 

888 for arg, defval in kw_args.items(): 

889 if arg in omit_kwarg: 

890 continue 

891 try: 

892 val = getattr(obj, arg, missing) 

893 if val is not missing and val != defval: 

894 output.append("%s=%r" % (arg, val)) 

895 except Exception: 

896 pass 

897 

898 if additional_kw: 

899 for arg, defval in additional_kw: 

900 try: 

901 val = getattr(obj, arg, missing) 

902 if val is not missing and val != defval: 

903 output.append("%s=%r" % (arg, val)) 

904 except Exception: 

905 pass 

906 

907 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output)) 

908 

909 

910class portable_instancemethod: 

911 """Turn an instancemethod into a (parent, name) pair 

912 to produce a serializable callable. 

913 

914 """ 

915 

916 __slots__ = "target", "name", "kwargs", "__weakref__" 

917 

918 def __getstate__(self): 

919 return { 

920 "target": self.target, 

921 "name": self.name, 

922 "kwargs": self.kwargs, 

923 } 

924 

925 def __setstate__(self, state): 

926 self.target = state["target"] 

927 self.name = state["name"] 

928 self.kwargs = state.get("kwargs", ()) 

929 

930 def __init__(self, meth, kwargs=()): 

931 self.target = meth.__self__ 

932 self.name = meth.__name__ 

933 self.kwargs = kwargs 

934 

935 def __call__(self, *arg, **kw): 

936 kw.update(self.kwargs) 

937 return getattr(self.target, self.name)(*arg, **kw) 

938 

939 

940def class_hierarchy(cls): 

941 """Return an unordered sequence of all classes related to cls. 

942 

943 Traverses diamond hierarchies. 

944 

945 Fibs slightly: subclasses of builtin types are not returned. Thus 

946 class_hierarchy(class A(object)) returns (A, object), not A plus every 

947 class systemwide that derives from object. 

948 

949 """ 

950 

951 hier = {cls} 

952 process = list(cls.__mro__) 

953 while process: 

954 c = process.pop() 

955 bases = (_ for _ in c.__bases__ if _ not in hier) 

956 

957 for b in bases: 

958 process.append(b) 

959 hier.add(b) 

960 

961 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

962 continue 

963 

964 for s in [ 

965 _ 

966 for _ in ( 

967 c.__subclasses__() 

968 if not issubclass(c, type) 

969 else c.__subclasses__(c) 

970 ) 

971 if _ not in hier 

972 ]: 

973 process.append(s) 

974 hier.add(s) 

975 return list(hier) 

976 

977 

978def iterate_attributes(cls): 

979 """iterate all the keys and attributes associated 

980 with a class, without using getattr(). 

981 

982 Does not use getattr() so that class-sensitive 

983 descriptors (i.e. property.__get__()) are not called. 

984 

985 """ 

986 keys = dir(cls) 

987 for key in keys: 

988 for c in cls.__mro__: 

989 if key in c.__dict__: 

990 yield (key, c.__dict__[key]) 

991 break 

992 

993 

994def monkeypatch_proxied_specials( 

995 into_cls, 

996 from_cls, 

997 skip=None, 

998 only=None, 

999 name="self.proxy", 

1000 from_instance=None, 

1001): 

1002 """Automates delegation of __specials__ for a proxying type.""" 

1003 

1004 if only: 

1005 dunders = only 

1006 else: 

1007 if skip is None: 

1008 skip = ( 

1009 "__slots__", 

1010 "__del__", 

1011 "__getattribute__", 

1012 "__metaclass__", 

1013 "__getstate__", 

1014 "__setstate__", 

1015 ) 

1016 dunders = [ 

1017 m 

1018 for m in dir(from_cls) 

1019 if ( 

1020 m.startswith("__") 

1021 and m.endswith("__") 

1022 and not hasattr(into_cls, m) 

1023 and m not in skip 

1024 ) 

1025 ] 

1026 

1027 for method in dunders: 

1028 try: 

1029 maybe_fn = getattr(from_cls, method) 

1030 if not hasattr(maybe_fn, "__call__"): 

1031 continue 

1032 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

1033 fn = cast(types.FunctionType, maybe_fn) 

1034 

1035 except AttributeError: 

1036 continue 

1037 try: 

1038 spec = compat.inspect_getfullargspec(fn) 

1039 fn_args = compat.inspect_formatargspec(spec[0]) 

1040 d_args = compat.inspect_formatargspec(spec[0][1:]) 

1041 except TypeError: 

1042 fn_args = "(self, *args, **kw)" 

1043 d_args = "(*args, **kw)" 

1044 

1045 py = ( 

1046 "def %(method)s%(fn_args)s: " 

1047 "return %(name)s.%(method)s%(d_args)s" % locals() 

1048 ) 

1049 

1050 env: Dict[str, types.FunctionType] = ( 

1051 from_instance is not None and {name: from_instance} or {} 

1052 ) 

1053 exec(py, env) 

1054 try: 

1055 env[method].__defaults__ = fn.__defaults__ 

1056 except AttributeError: 

1057 pass 

1058 setattr(into_cls, method, env[method]) 

1059 

1060 

1061def methods_equivalent(meth1, meth2): 

1062 """Return True if the two methods are the same implementation.""" 

1063 

1064 return getattr(meth1, "__func__", meth1) is getattr( 

1065 meth2, "__func__", meth2 

1066 ) 

1067 

1068 

1069def as_interface(obj, cls=None, methods=None, required=None): 

1070 """Ensure basic interface compliance for an instance or dict of callables. 

1071 

1072 Checks that ``obj`` implements public methods of ``cls`` or has members 

1073 listed in ``methods``. If ``required`` is not supplied, implementing at 

1074 least one interface method is sufficient. Methods present on ``obj`` that 

1075 are not in the interface are ignored. 

1076 

1077 If ``obj`` is a dict and ``dict`` does not meet the interface 

1078 requirements, the keys of the dictionary are inspected. Keys present in 

1079 ``obj`` that are not in the interface will raise TypeErrors. 

1080 

1081 Raises TypeError if ``obj`` does not meet the interface criteria. 

1082 

1083 In all passing cases, an object with callable members is returned. In the 

1084 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1085 an anonymous class is returned. 

1086 

1087 obj 

1088 A type, instance, or dictionary of callables. 

1089 cls 

1090 Optional, a type. All public methods of cls are considered the 

1091 interface. An ``obj`` instance of cls will always pass, ignoring 

1092 ``required``.. 

1093 methods 

1094 Optional, a sequence of method names to consider as the interface. 

1095 required 

1096 Optional, a sequence of mandatory implementations. If omitted, an 

1097 ``obj`` that provides at least one interface method is considered 

1098 sufficient. As a convenience, required may be a type, in which case 

1099 all public methods of the type are required. 

1100 

1101 """ 

1102 if not cls and not methods: 

1103 raise TypeError("a class or collection of method names are required") 

1104 

1105 if isinstance(cls, type) and isinstance(obj, cls): 

1106 return obj 

1107 

1108 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1109 implemented = set(dir(obj)) 

1110 

1111 complies = operator.ge 

1112 if isinstance(required, type): 

1113 required = interface 

1114 elif not required: 

1115 required = set() 

1116 complies = operator.gt 

1117 else: 

1118 required = set(required) 

1119 

1120 if complies(implemented.intersection(interface), required): 

1121 return obj 

1122 

1123 # No dict duck typing here. 

1124 if not isinstance(obj, dict): 

1125 qualifier = complies is operator.gt and "any of" or "all of" 

1126 raise TypeError( 

1127 "%r does not implement %s: %s" 

1128 % (obj, qualifier, ", ".join(interface)) 

1129 ) 

1130 

1131 class AnonymousInterface: 

1132 """A callable-holding shell.""" 

1133 

1134 if cls: 

1135 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1136 found = set() 

1137 

1138 for method, impl in dictlike_iteritems(obj): 

1139 if method not in interface: 

1140 raise TypeError("%r: unknown in this interface" % method) 

1141 if not callable(impl): 

1142 raise TypeError("%r=%r is not callable" % (method, impl)) 

1143 setattr(AnonymousInterface, method, staticmethod(impl)) 

1144 found.add(method) 

1145 

1146 if complies(found, required): 

1147 return AnonymousInterface 

1148 

1149 raise TypeError( 

1150 "dictionary does not contain required keys %s" 

1151 % ", ".join(required - found) 

1152 ) 

1153 

1154 

1155_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1156 

1157 

1158class generic_fn_descriptor(Generic[_T_co]): 

1159 """Descriptor which proxies a function when the attribute is not 

1160 present in dict 

1161 

1162 This superclass is organized in a particular way with "memoized" and 

1163 "non-memoized" implementation classes that are hidden from type checkers, 

1164 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1165 classes used for the same attribute. 

1166 

1167 """ 

1168 

1169 fget: Callable[..., _T_co] 

1170 __doc__: Optional[str] 

1171 __name__: str 

1172 

1173 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1174 self.fget = fget 

1175 self.__doc__ = doc or fget.__doc__ 

1176 self.__name__ = fget.__name__ 

1177 

1178 @overload 

1179 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1180 

1181 @overload 

1182 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1183 

1184 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1185 raise NotImplementedError() 

1186 

1187 if TYPE_CHECKING: 

1188 

1189 def __set__(self, instance: Any, value: Any) -> None: ... 

1190 

1191 def __delete__(self, instance: Any) -> None: ... 

1192 

1193 def _reset(self, obj: Any) -> None: 

1194 raise NotImplementedError() 

1195 

1196 @classmethod 

1197 def reset(cls, obj: Any, name: str) -> None: 

1198 raise NotImplementedError() 

1199 

1200 

1201class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1202 """a plain descriptor that proxies a function. 

1203 

1204 primary rationale is to provide a plain attribute that's 

1205 compatible with memoized_property which is also recognized as equivalent 

1206 by mypy. 

1207 

1208 """ 

1209 

1210 if not TYPE_CHECKING: 

1211 

1212 def __get__(self, obj, cls): 

1213 if obj is None: 

1214 return self 

1215 return self.fget(obj) 

1216 

1217 

1218class _memoized_property(generic_fn_descriptor[_T_co]): 

1219 """A read-only @property that is only evaluated once.""" 

1220 

1221 if not TYPE_CHECKING: 

1222 

1223 def __get__(self, obj, cls): 

1224 if obj is None: 

1225 return self 

1226 obj.__dict__[self.__name__] = result = self.fget(obj) 

1227 return result 

1228 

1229 def _reset(self, obj): 

1230 _memoized_property.reset(obj, self.__name__) 

1231 

1232 @classmethod 

1233 def reset(cls, obj, name): 

1234 obj.__dict__.pop(name, None) 

1235 

1236 

1237# despite many attempts to get Mypy to recognize an overridden descriptor 

1238# where one is memoized and the other isn't, there seems to be no reliable 

1239# way other than completely deceiving the type checker into thinking there 

1240# is just one single descriptor type everywhere. Otherwise, if a superclass 

1241# has non-memoized and subclass has memoized, that requires 

1242# "class memoized(non_memoized)". but then if a superclass has memoized and 

1243# superclass has non-memoized, the class hierarchy of the descriptors 

1244# would need to be reversed; "class non_memoized(memoized)". so there's no 

1245# way to achieve this. 

1246# additional issues, RO properties: 

1247# https://github.com/python/mypy/issues/12440 

1248if TYPE_CHECKING: 

1249 # allow memoized and non-memoized to be freely mixed by having them 

1250 # be the same class 

1251 memoized_property = generic_fn_descriptor 

1252 non_memoized_property = generic_fn_descriptor 

1253 

1254 # for read only situations, mypy only sees @property as read only. 

1255 # read only is needed when a subtype specializes the return type 

1256 # of a property, meaning assignment needs to be disallowed 

1257 ro_memoized_property = property 

1258 ro_non_memoized_property = property 

1259 

1260else: 

1261 memoized_property = ro_memoized_property = _memoized_property 

1262 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1263 

1264 

1265def memoized_instancemethod(fn: _F) -> _F: 

1266 """Decorate a method memoize its return value. 

1267 

1268 Best applied to no-arg methods: memoization is not sensitive to 

1269 argument values, and will always return the same value even when 

1270 called with different arguments. 

1271 

1272 """ 

1273 

1274 def oneshot(self, *args, **kw): 

1275 result = fn(self, *args, **kw) 

1276 

1277 def memo(*a, **kw): 

1278 return result 

1279 

1280 memo.__name__ = fn.__name__ 

1281 memo.__doc__ = fn.__doc__ 

1282 self.__dict__[fn.__name__] = memo 

1283 return result 

1284 

1285 return update_wrapper(oneshot, fn) # type: ignore 

1286 

1287 

1288class HasMemoized: 

1289 """A mixin class that maintains the names of memoized elements in a 

1290 collection for easy cache clearing, generative, etc. 

1291 

1292 """ 

1293 

1294 if not TYPE_CHECKING: 

1295 # support classes that want to have __slots__ with an explicit 

1296 # slot for __dict__. not sure if that requires base __slots__ here. 

1297 __slots__ = () 

1298 

1299 _memoized_keys: FrozenSet[str] = frozenset() 

1300 

1301 def _reset_memoizations(self) -> None: 

1302 for elem in self._memoized_keys: 

1303 self.__dict__.pop(elem, None) 

1304 

1305 def _assert_no_memoizations(self) -> None: 

1306 for elem in self._memoized_keys: 

1307 assert elem not in self.__dict__ 

1308 

1309 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1310 self.__dict__[key] = value 

1311 self._memoized_keys |= {key} 

1312 

1313 class memoized_attribute(memoized_property[_T]): 

1314 """A read-only @property that is only evaluated once. 

1315 

1316 :meta private: 

1317 

1318 """ 

1319 

1320 fget: Callable[..., _T] 

1321 __doc__: Optional[str] 

1322 __name__: str 

1323 

1324 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1325 self.fget = fget 

1326 self.__doc__ = doc or fget.__doc__ 

1327 self.__name__ = fget.__name__ 

1328 

1329 @overload 

1330 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1331 

1332 @overload 

1333 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1334 

1335 def __get__(self, obj, cls): 

1336 if obj is None: 

1337 return self 

1338 obj.__dict__[self.__name__] = result = self.fget(obj) 

1339 obj._memoized_keys |= {self.__name__} 

1340 return result 

1341 

1342 @classmethod 

1343 def memoized_instancemethod(cls, fn: _F) -> _F: 

1344 """Decorate a method memoize its return value. 

1345 

1346 :meta private: 

1347 

1348 """ 

1349 

1350 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1351 result = fn(self, *args, **kw) 

1352 

1353 def memo(*a, **kw): 

1354 return result 

1355 

1356 memo.__name__ = fn.__name__ 

1357 memo.__doc__ = fn.__doc__ 

1358 self.__dict__[fn.__name__] = memo 

1359 self._memoized_keys |= {fn.__name__} 

1360 return result 

1361 

1362 return update_wrapper(oneshot, fn) # type: ignore 

1363 

1364 

1365if TYPE_CHECKING: 

1366 HasMemoized_ro_memoized_attribute = property 

1367else: 

1368 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1369 

1370 

1371class MemoizedSlots: 

1372 """Apply memoized items to an object using a __getattr__ scheme. 

1373 

1374 This allows the functionality of memoized_property and 

1375 memoized_instancemethod to be available to a class using __slots__. 

1376 

1377 """ 

1378 

1379 __slots__ = () 

1380 

1381 def _fallback_getattr(self, key): 

1382 raise AttributeError(key) 

1383 

1384 def __getattr__(self, key: str) -> Any: 

1385 if key.startswith("_memoized_attr_") or key.startswith( 

1386 "_memoized_method_" 

1387 ): 

1388 raise AttributeError(key) 

1389 # to avoid recursion errors when interacting with other __getattr__ 

1390 # schemes that refer to this one, when testing for memoized method 

1391 # look at __class__ only rather than going into __getattr__ again. 

1392 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1393 value = getattr(self, f"_memoized_attr_{key}")() 

1394 setattr(self, key, value) 

1395 return value 

1396 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1397 fn = getattr(self, f"_memoized_method_{key}") 

1398 

1399 def oneshot(*args, **kw): 

1400 result = fn(*args, **kw) 

1401 

1402 def memo(*a, **kw): 

1403 return result 

1404 

1405 memo.__name__ = fn.__name__ 

1406 memo.__doc__ = fn.__doc__ 

1407 setattr(self, key, memo) 

1408 return result 

1409 

1410 oneshot.__doc__ = fn.__doc__ 

1411 return oneshot 

1412 else: 

1413 return self._fallback_getattr(key) 

1414 

1415 

1416# from paste.deploy.converters 

1417def asbool(obj: Any) -> bool: 

1418 if isinstance(obj, str): 

1419 obj = obj.strip().lower() 

1420 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1421 return True 

1422 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1423 return False 

1424 else: 

1425 raise ValueError("String is not true/false: %r" % obj) 

1426 return bool(obj) 

1427 

1428 

1429def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1430 """Return a callable that will evaluate a string as 

1431 boolean, or one of a set of "alternate" string values. 

1432 

1433 """ 

1434 

1435 def bool_or_value(obj: str) -> Union[str, bool]: 

1436 if obj in text: 

1437 return obj 

1438 else: 

1439 return asbool(obj) 

1440 

1441 return bool_or_value 

1442 

1443 

1444def asint(value: Any) -> Optional[int]: 

1445 """Coerce to integer.""" 

1446 

1447 if value is None: 

1448 return value 

1449 return int(value) 

1450 

1451 

1452def coerce_kw_type( 

1453 kw: Dict[str, Any], 

1454 key: str, 

1455 type_: Type[Any], 

1456 flexi_bool: bool = True, 

1457 dest: Optional[Dict[str, Any]] = None, 

1458) -> None: 

1459 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1460 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1461 when coercing to boolean. 

1462 """ 

1463 

1464 if dest is None: 

1465 dest = kw 

1466 

1467 if ( 

1468 key in kw 

1469 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1470 and kw[key] is not None 

1471 ): 

1472 if type_ is bool and flexi_bool: 

1473 dest[key] = asbool(kw[key]) 

1474 else: 

1475 dest[key] = type_(kw[key]) 

1476 

1477 

1478def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1479 """Produce a tuple structure that is cacheable using the __dict__ of 

1480 obj to retrieve values 

1481 

1482 """ 

1483 names = get_cls_kwargs(cls) 

1484 return (cls,) + tuple( 

1485 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1486 ) 

1487 

1488 

1489def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1490 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1491 

1492 Uses inspect to match the named arguments of ``cls``. 

1493 

1494 """ 

1495 

1496 names = get_cls_kwargs(cls) 

1497 kw.update( 

1498 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1499 ) 

1500 return cls(*args, **kw) 

1501 

1502 

1503def counter() -> Callable[[], int]: 

1504 """Return a threadsafe counter function.""" 

1505 

1506 lock = threading.Lock() 

1507 counter = itertools.count(1) 

1508 

1509 # avoid the 2to3 "next" transformation... 

1510 def _next(): 

1511 with lock: 

1512 return next(counter) 

1513 

1514 return _next 

1515 

1516 

1517def duck_type_collection( 

1518 specimen: Any, default: Optional[Type[Any]] = None 

1519) -> Optional[Type[Any]]: 

1520 """Given an instance or class, guess if it is or is acting as one of 

1521 the basic collection types: list, set and dict. If the __emulates__ 

1522 property is present, return that preferentially. 

1523 """ 

1524 

1525 if hasattr(specimen, "__emulates__"): 

1526 # canonicalize set vs sets.Set to a standard: the builtin set 

1527 if specimen.__emulates__ is not None and issubclass( 

1528 specimen.__emulates__, set 

1529 ): 

1530 return set 

1531 else: 

1532 return specimen.__emulates__ # type: ignore 

1533 

1534 isa = issubclass if isinstance(specimen, type) else isinstance 

1535 if isa(specimen, list): 

1536 return list 

1537 elif isa(specimen, set): 

1538 return set 

1539 elif isa(specimen, dict): 

1540 return dict 

1541 

1542 if hasattr(specimen, "append"): 

1543 return list 

1544 elif hasattr(specimen, "add"): 

1545 return set 

1546 elif hasattr(specimen, "set"): 

1547 return dict 

1548 else: 

1549 return default 

1550 

1551 

1552def assert_arg_type( 

1553 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1554) -> Any: 

1555 if isinstance(arg, argtype): 

1556 return arg 

1557 else: 

1558 if isinstance(argtype, tuple): 

1559 raise exc.ArgumentError( 

1560 "Argument '%s' is expected to be one of type %s, got '%s'" 

1561 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1562 ) 

1563 else: 

1564 raise exc.ArgumentError( 

1565 "Argument '%s' is expected to be of type '%s', got '%s'" 

1566 % (name, argtype, type(arg)) 

1567 ) 

1568 

1569 

1570def dictlike_iteritems(dictlike): 

1571 """Return a (key, value) iterator for almost any dict-like object.""" 

1572 

1573 if hasattr(dictlike, "items"): 

1574 return list(dictlike.items()) 

1575 

1576 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1577 if getter is None: 

1578 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1579 

1580 if hasattr(dictlike, "iterkeys"): 

1581 

1582 def iterator(): 

1583 for key in dictlike.iterkeys(): 

1584 assert getter is not None 

1585 yield key, getter(key) 

1586 

1587 return iterator() 

1588 elif hasattr(dictlike, "keys"): 

1589 return iter((key, getter(key)) for key in dictlike.keys()) 

1590 else: 

1591 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1592 

1593 

1594class classproperty(property): 

1595 """A decorator that behaves like @property except that operates 

1596 on classes rather than instances. 

1597 

1598 The decorator is currently special when using the declarative 

1599 module, but note that the 

1600 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1601 decorator should be used for this purpose with declarative. 

1602 

1603 """ 

1604 

1605 fget: Callable[[Any], Any] 

1606 

1607 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1608 super().__init__(fget, *arg, **kw) 

1609 self.__doc__ = fget.__doc__ 

1610 

1611 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1612 return self.fget(cls) 

1613 

1614 

1615class hybridproperty(Generic[_T]): 

1616 def __init__(self, func: Callable[..., _T]): 

1617 self.func = func 

1618 self.clslevel = func 

1619 

1620 def __get__(self, instance: Any, owner: Any) -> _T: 

1621 if instance is None: 

1622 clsval = self.clslevel(owner) 

1623 return clsval 

1624 else: 

1625 return self.func(instance) 

1626 

1627 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1628 self.clslevel = func 

1629 return self 

1630 

1631 

1632class rw_hybridproperty(Generic[_T]): 

1633 def __init__(self, func: Callable[..., _T]): 

1634 self.func = func 

1635 self.clslevel = func 

1636 self.setfn: Optional[Callable[..., Any]] = None 

1637 

1638 def __get__(self, instance: Any, owner: Any) -> _T: 

1639 if instance is None: 

1640 clsval = self.clslevel(owner) 

1641 return clsval 

1642 else: 

1643 return self.func(instance) 

1644 

1645 def __set__(self, instance: Any, value: Any) -> None: 

1646 assert self.setfn is not None 

1647 self.setfn(instance, value) 

1648 

1649 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1650 self.setfn = func 

1651 return self 

1652 

1653 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1654 self.clslevel = func 

1655 return self 

1656 

1657 

1658class hybridmethod(Generic[_T]): 

1659 """Decorate a function as cls- or instance- level.""" 

1660 

1661 def __init__(self, func: Callable[..., _T]): 

1662 self.func = self.__func__ = func 

1663 self.clslevel = func 

1664 

1665 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1666 if instance is None: 

1667 return self.clslevel.__get__(owner, owner.__class__) # type:ignore 

1668 else: 

1669 return self.func.__get__(instance, owner) # type:ignore 

1670 

1671 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1672 self.clslevel = func 

1673 return self 

1674 

1675 

1676class symbol(int): 

1677 """A constant symbol. 

1678 

1679 >>> symbol("foo") is symbol("foo") 

1680 True 

1681 >>> symbol("foo") 

1682 <symbol 'foo> 

1683 

1684 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1685 advantage of symbol() is its repr(). They are also singletons. 

1686 

1687 Repeated calls of symbol('name') will all return the same instance. 

1688 

1689 """ 

1690 

1691 name: str 

1692 

1693 symbols: Dict[str, symbol] = {} 

1694 _lock = threading.Lock() 

1695 

1696 def __new__( 

1697 cls, 

1698 name: str, 

1699 doc: Optional[str] = None, 

1700 canonical: Optional[int] = None, 

1701 ) -> symbol: 

1702 with cls._lock: 

1703 sym = cls.symbols.get(name) 

1704 if sym is None: 

1705 assert isinstance(name, str) 

1706 if canonical is None: 

1707 canonical = hash(name) 

1708 sym = int.__new__(symbol, canonical) 

1709 sym.name = name 

1710 if doc: 

1711 sym.__doc__ = doc 

1712 

1713 # NOTE: we should ultimately get rid of this global thing, 

1714 # however, currently it is to support pickling. The best 

1715 # change would be when we are on py3.11 at a minimum, we 

1716 # switch to stdlib enum.IntFlag. 

1717 cls.symbols[name] = sym 

1718 else: 

1719 if canonical and canonical != sym: 

1720 raise TypeError( 

1721 f"Can't replace canonical symbol for {name!r} " 

1722 f"with new int value {canonical}" 

1723 ) 

1724 return sym 

1725 

1726 def __reduce__(self): 

1727 return symbol, (self.name, "x", int(self)) 

1728 

1729 def __str__(self): 

1730 return repr(self) 

1731 

1732 def __repr__(self): 

1733 return f"symbol({self.name!r})" 

1734 

1735 

1736class _IntFlagMeta(type): 

1737 def __init__( 

1738 cls, 

1739 classname: str, 

1740 bases: Tuple[Type[Any], ...], 

1741 dict_: Dict[str, Any], 

1742 **kw: Any, 

1743 ) -> None: 

1744 items: List[symbol] 

1745 cls._items = items = [] 

1746 for k, v in dict_.items(): 

1747 if re.match(r"^__.*__$", k): 

1748 continue 

1749 if isinstance(v, int): 

1750 sym = symbol(k, canonical=v) 

1751 elif not k.startswith("_"): 

1752 raise TypeError("Expected integer values for IntFlag") 

1753 else: 

1754 continue 

1755 setattr(cls, k, sym) 

1756 items.append(sym) 

1757 

1758 cls.__members__ = _collections.immutabledict( 

1759 {sym.name: sym for sym in items} 

1760 ) 

1761 

1762 def __iter__(self) -> Iterator[symbol]: 

1763 raise NotImplementedError( 

1764 "iter not implemented to ensure compatibility with " 

1765 "Python 3.11 IntFlag. Please use __members__. See " 

1766 "https://github.com/python/cpython/issues/99304" 

1767 ) 

1768 

1769 

1770class _FastIntFlag(metaclass=_IntFlagMeta): 

1771 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1772 operations. 

1773 

1774 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1775 and ``_FastIntFlag`` otherwise. 

1776 

1777 """ 

1778 

1779 

1780if TYPE_CHECKING: 

1781 from enum import IntFlag 

1782 

1783 FastIntFlag = IntFlag 

1784else: 

1785 FastIntFlag = _FastIntFlag 

1786 

1787 

1788_E = TypeVar("_E", bound=enum.Enum) 

1789 

1790 

1791def parse_user_argument_for_enum( 

1792 arg: Any, 

1793 choices: Dict[_E, List[Any]], 

1794 name: str, 

1795 resolve_symbol_names: bool = False, 

1796) -> Optional[_E]: 

1797 """Given a user parameter, parse the parameter into a chosen value 

1798 from a list of choice objects, typically Enum values. 

1799 

1800 The user argument can be a string name that matches the name of a 

1801 symbol, or the symbol object itself, or any number of alternate choices 

1802 such as True/False/ None etc. 

1803 

1804 :param arg: the user argument. 

1805 :param choices: dictionary of enum values to lists of possible 

1806 entries for each. 

1807 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1808 that is raised if the parameter doesn't match any available argument. 

1809 

1810 """ 

1811 for enum_value, choice in choices.items(): 

1812 if arg is enum_value: 

1813 return enum_value 

1814 elif resolve_symbol_names and arg == enum_value.name: 

1815 return enum_value 

1816 elif arg in choice: 

1817 return enum_value 

1818 

1819 if arg is None: 

1820 return None 

1821 

1822 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1823 

1824 

1825_creation_order = 1 

1826 

1827 

1828def set_creation_order(instance: Any) -> None: 

1829 """Assign a '_creation_order' sequence to the given instance. 

1830 

1831 This allows multiple instances to be sorted in order of creation 

1832 (typically within a single thread; the counter is not particularly 

1833 threadsafe). 

1834 

1835 """ 

1836 global _creation_order 

1837 instance._creation_order = _creation_order 

1838 _creation_order += 1 

1839 

1840 

1841def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1842 """executes the given function, catches all exceptions and converts to 

1843 a warning. 

1844 

1845 """ 

1846 try: 

1847 return func(*args, **kwargs) 

1848 except Exception: 

1849 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1850 

1851 

1852def ellipses_string(value, len_=25): 

1853 try: 

1854 if len(value) > len_: 

1855 return "%s..." % value[0:len_] 

1856 else: 

1857 return value 

1858 except TypeError: 

1859 return value 

1860 

1861 

1862class _hash_limit_string(str): 

1863 """A string subclass that can only be hashed on a maximum amount 

1864 of unique values. 

1865 

1866 This is used for warnings so that we can send out parameterized warnings 

1867 without the __warningregistry__ of the module, or the non-overridable 

1868 "once" registry within warnings.py, overloading memory, 

1869 

1870 

1871 """ 

1872 

1873 _hash: int 

1874 

1875 def __new__( 

1876 cls, value: str, num: int, args: Sequence[Any] 

1877 ) -> _hash_limit_string: 

1878 interpolated = (value % args) + ( 

1879 " (this warning may be suppressed after %d occurrences)" % num 

1880 ) 

1881 self = super().__new__(cls, interpolated) 

1882 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1883 return self 

1884 

1885 def __hash__(self) -> int: 

1886 return self._hash 

1887 

1888 def __eq__(self, other: Any) -> bool: 

1889 return hash(self) == hash(other) 

1890 

1891 

1892def warn(msg: str, code: Optional[str] = None) -> None: 

1893 """Issue a warning. 

1894 

1895 If msg is a string, :class:`.exc.SAWarning` is used as 

1896 the category. 

1897 

1898 """ 

1899 if code: 

1900 _warnings_warn(exc.SAWarning(msg, code=code)) 

1901 else: 

1902 _warnings_warn(msg, exc.SAWarning) 

1903 

1904 

1905def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1906 """Issue a warning with a parameterized string, limiting the number 

1907 of registrations. 

1908 

1909 """ 

1910 if args: 

1911 msg = _hash_limit_string(msg, 10, args) 

1912 _warnings_warn(msg, exc.SAWarning) 

1913 

1914 

1915_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1916 

1917 

1918def tag_method_for_warnings( 

1919 message: str, category: Type[Warning] 

1920) -> Callable[[_F], _F]: 

1921 def go(fn): 

1922 _warning_tags[fn.__code__] = (message, category) 

1923 return fn 

1924 

1925 return go 

1926 

1927 

1928_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1929 

1930 

1931def _warnings_warn( 

1932 message: Union[str, Warning], 

1933 category: Optional[Type[Warning]] = None, 

1934 stacklevel: int = 2, 

1935) -> None: 

1936 # adjust the given stacklevel to be outside of SQLAlchemy 

1937 try: 

1938 frame = sys._getframe(stacklevel) 

1939 except ValueError: 

1940 # being called from less than 3 (or given) stacklevels, weird, 

1941 # but don't crash 

1942 stacklevel = 0 

1943 except: 

1944 # _getframe() doesn't work, weird interpreter issue, weird, 

1945 # ok, but don't crash 

1946 stacklevel = 0 

1947 else: 

1948 stacklevel_found = warning_tag_found = False 

1949 while frame is not None: 

1950 # using __name__ here requires that we have __name__ in the 

1951 # __globals__ of the decorated string functions we make also. 

1952 # we generate this using {"__name__": fn.__module__} 

1953 if not stacklevel_found and not re.match( 

1954 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1955 ): 

1956 # stop incrementing stack level if an out-of-SQLA line 

1957 # were found. 

1958 stacklevel_found = True 

1959 

1960 # however, for the warning tag thing, we have to keep 

1961 # scanning up the whole traceback 

1962 

1963 if frame.f_code in _warning_tags: 

1964 warning_tag_found = True 

1965 (_suffix, _category) = _warning_tags[frame.f_code] 

1966 category = category or _category 

1967 message = f"{message} ({_suffix})" 

1968 

1969 frame = frame.f_back # type: ignore[assignment] 

1970 

1971 if not stacklevel_found: 

1972 stacklevel += 1 

1973 elif stacklevel_found and warning_tag_found: 

1974 break 

1975 

1976 if category is not None: 

1977 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1978 else: 

1979 warnings.warn(message, stacklevel=stacklevel + 1) 

1980 

1981 

1982def only_once( 

1983 fn: Callable[..., _T], retry_on_exception: bool 

1984) -> Callable[..., Optional[_T]]: 

1985 """Decorate the given function to be a no-op after it is called exactly 

1986 once.""" 

1987 

1988 once = [fn] 

1989 

1990 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1991 # strong reference fn so that it isn't garbage collected, 

1992 # which interferes with the event system's expectations 

1993 strong_fn = fn # noqa 

1994 if once: 

1995 once_fn = once.pop() 

1996 try: 

1997 return once_fn(*arg, **kw) 

1998 except: 

1999 if retry_on_exception: 

2000 once.insert(0, once_fn) 

2001 raise 

2002 

2003 return None 

2004 

2005 return go 

2006 

2007 

2008_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

2009_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

2010 

2011 

2012def chop_traceback( 

2013 tb: List[str], 

2014 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

2015 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

2016) -> List[str]: 

2017 """Chop extraneous lines off beginning and end of a traceback. 

2018 

2019 :param tb: 

2020 a list of traceback lines as returned by ``traceback.format_stack()`` 

2021 

2022 :param exclude_prefix: 

2023 a regular expression object matching lines to skip at beginning of 

2024 ``tb`` 

2025 

2026 :param exclude_suffix: 

2027 a regular expression object matching lines to skip at end of ``tb`` 

2028 """ 

2029 start = 0 

2030 end = len(tb) - 1 

2031 while start <= end and exclude_prefix.search(tb[start]): 

2032 start += 1 

2033 while start <= end and exclude_suffix.search(tb[end]): 

2034 end -= 1 

2035 return tb[start : end + 1] 

2036 

2037 

2038NoneType = type(None) 

2039 

2040 

2041def attrsetter(attrname): 

2042 code = "def set(obj, value): obj.%s = value" % attrname 

2043 env = locals().copy() 

2044 exec(code, env) 

2045 return env["set"] 

2046 

2047 

2048_dunders = re.compile("^__.+__$") 

2049 

2050 

2051class TypingOnly: 

2052 """A mixin class that marks a class as 'typing only', meaning it has 

2053 absolutely no methods, attributes, or runtime functionality whatsoever. 

2054 

2055 """ 

2056 

2057 __slots__ = () 

2058 

2059 def __init_subclass__(cls) -> None: 

2060 if TypingOnly in cls.__bases__: 

2061 remaining = { 

2062 name for name in cls.__dict__ if not _dunders.match(name) 

2063 } 

2064 if remaining: 

2065 raise AssertionError( 

2066 f"Class {cls} directly inherits TypingOnly but has " 

2067 f"additional attributes {remaining}." 

2068 ) 

2069 super().__init_subclass__() 

2070 

2071 

2072class EnsureKWArg: 

2073 r"""Apply translation of functions to accept \**kw arguments if they 

2074 don't already. 

2075 

2076 Used to ensure cross-compatibility with third party legacy code, for things 

2077 like compiler visit methods that need to accept ``**kw`` arguments, 

2078 but may have been copied from old code that didn't accept them. 

2079 

2080 """ 

2081 

2082 ensure_kwarg: str 

2083 """a regular expression that indicates method names for which the method 

2084 should accept ``**kw`` arguments. 

2085 

2086 The class will scan for methods matching the name template and decorate 

2087 them if necessary to ensure ``**kw`` parameters are accepted. 

2088 

2089 """ 

2090 

2091 def __init_subclass__(cls) -> None: 

2092 fn_reg = cls.ensure_kwarg 

2093 clsdict = cls.__dict__ 

2094 if fn_reg: 

2095 for key in clsdict: 

2096 m = re.match(fn_reg, key) 

2097 if m: 

2098 fn = clsdict[key] 

2099 spec = compat.inspect_getfullargspec(fn) 

2100 if not spec.varkw: 

2101 wrapped = cls._wrap_w_kw(fn) 

2102 setattr(cls, key, wrapped) 

2103 super().__init_subclass__() 

2104 

2105 @classmethod 

2106 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2107 def wrap(*arg: Any, **kw: Any) -> Any: 

2108 return fn(*arg) 

2109 

2110 return update_wrapper(wrap, fn) 

2111 

2112 

2113def wrap_callable(wrapper, fn): 

2114 """Augment functools.update_wrapper() to work with objects with 

2115 a ``__call__()`` method. 

2116 

2117 :param fn: 

2118 object with __call__ method 

2119 

2120 """ 

2121 if hasattr(fn, "__name__"): 

2122 return update_wrapper(wrapper, fn) 

2123 else: 

2124 _f = wrapper 

2125 _f.__name__ = fn.__class__.__name__ 

2126 if hasattr(fn, "__module__"): 

2127 _f.__module__ = fn.__module__ 

2128 

2129 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2130 _f.__doc__ = fn.__call__.__doc__ 

2131 elif fn.__doc__: 

2132 _f.__doc__ = fn.__doc__ 

2133 

2134 return _f 

2135 

2136 

2137def quoted_token_parser(value): 

2138 """Parse a dotted identifier with accommodation for quoted names. 

2139 

2140 Includes support for SQL-style double quotes as a literal character. 

2141 

2142 E.g.:: 

2143 

2144 >>> quoted_token_parser("name") 

2145 ["name"] 

2146 >>> quoted_token_parser("schema.name") 

2147 ["schema", "name"] 

2148 >>> quoted_token_parser('"Schema"."Name"') 

2149 ['Schema', 'Name'] 

2150 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2151 ['Schema', 'Name""Foo'] 

2152 

2153 """ 

2154 

2155 if '"' not in value: 

2156 return value.split(".") 

2157 

2158 # 0 = outside of quotes 

2159 # 1 = inside of quotes 

2160 state = 0 

2161 result: List[List[str]] = [[]] 

2162 idx = 0 

2163 lv = len(value) 

2164 while idx < lv: 

2165 char = value[idx] 

2166 if char == '"': 

2167 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2168 result[-1].append('"') 

2169 idx += 1 

2170 else: 

2171 state ^= 1 

2172 elif char == "." and state == 0: 

2173 result.append([]) 

2174 else: 

2175 result[-1].append(char) 

2176 idx += 1 

2177 

2178 return ["".join(token) for token in result] 

2179 

2180 

2181def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2182 params = _collections.to_list(params) 

2183 

2184 def decorate(fn): 

2185 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2186 if doc: 

2187 doc = inject_param_text(doc, {param: text for param in params}) 

2188 fn.__doc__ = doc 

2189 return fn 

2190 

2191 return decorate 

2192 

2193 

2194def _dedent_docstring(text: str) -> str: 

2195 split_text = text.split("\n", 1) 

2196 if len(split_text) == 1: 

2197 return text 

2198 else: 

2199 firstline, remaining = split_text 

2200 if not firstline.startswith(" "): 

2201 return firstline + "\n" + textwrap.dedent(remaining) 

2202 else: 

2203 return textwrap.dedent(text) 

2204 

2205 

2206def inject_docstring_text( 

2207 given_doctext: Optional[str], injecttext: str, pos: int 

2208) -> str: 

2209 doctext: str = _dedent_docstring(given_doctext or "") 

2210 lines = doctext.split("\n") 

2211 if len(lines) == 1: 

2212 lines.append("") 

2213 injectlines = textwrap.dedent(injecttext).split("\n") 

2214 if injectlines[0]: 

2215 injectlines.insert(0, "") 

2216 

2217 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2218 blanks.insert(0, 0) 

2219 

2220 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2221 

2222 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2223 return "\n".join(lines) 

2224 

2225 

2226_param_reg = re.compile(r"(\s+):param (.+?):") 

2227 

2228 

2229def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2230 doclines = collections.deque(doctext.splitlines()) 

2231 lines = [] 

2232 

2233 # TODO: this is not working for params like ":param case_sensitive=True:" 

2234 

2235 to_inject = None 

2236 while doclines: 

2237 line = doclines.popleft() 

2238 

2239 m = _param_reg.match(line) 

2240 

2241 if to_inject is None: 

2242 if m: 

2243 param = m.group(2).lstrip("*") 

2244 if param in inject_params: 

2245 # default indent to that of :param: plus one 

2246 indent = " " * len(m.group(1)) + " " 

2247 

2248 # but if the next line has text, use that line's 

2249 # indentation 

2250 if doclines: 

2251 m2 = re.match(r"(\s+)\S", doclines[0]) 

2252 if m2: 

2253 indent = " " * len(m2.group(1)) 

2254 

2255 to_inject = indent + inject_params[param] 

2256 elif m: 

2257 lines.extend(["\n", to_inject, "\n"]) 

2258 to_inject = None 

2259 elif not line.rstrip(): 

2260 lines.extend([line, to_inject, "\n"]) 

2261 to_inject = None 

2262 elif line.endswith("::"): 

2263 # TODO: this still won't cover if the code example itself has 

2264 # blank lines in it, need to detect those via indentation. 

2265 lines.extend([line, doclines.popleft()]) 

2266 continue 

2267 lines.append(line) 

2268 

2269 return "\n".join(lines) 

2270 

2271 

2272def repr_tuple_names(names: List[str]) -> Optional[str]: 

2273 """Trims a list of strings from the middle and return a string of up to 

2274 four elements. Strings greater than 11 characters will be truncated""" 

2275 if len(names) == 0: 

2276 return None 

2277 flag = len(names) <= 4 

2278 names = names[0:4] if flag else names[0:3] + names[-1:] 

2279 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2280 if flag: 

2281 return ", ".join(res) 

2282 else: 

2283 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2284 

2285 

2286def has_compiled_ext(raise_=False): 

2287 if HAS_CYEXTENSION: 

2288 return True 

2289 elif raise_: 

2290 raise ImportError( 

2291 "cython extensions were expected to be installed, " 

2292 "but are not present" 

2293 ) 

2294 else: 

2295 return False 

2296 

2297 

2298class _Missing(enum.Enum): 

2299 Missing = enum.auto() 

2300 

2301 

2302Missing = _Missing.Missing 

2303MissingOr = Union[_T, Literal[_Missing.Missing]]