Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1050 statements  

1# util/langhelpers.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Routines to help with the creation, loading and introspection of 

10modules, classes, hierarchies, attributes, functions, and methods. 

11 

12""" 

13from __future__ import annotations 

14 

15import collections 

16import enum 

17from functools import update_wrapper 

18import inspect 

19import itertools 

20import operator 

21import re 

22import sys 

23import textwrap 

24import threading 

25import types 

26from types import CodeType 

27from typing import Any 

28from typing import Callable 

29from typing import cast 

30from typing import Dict 

31from typing import FrozenSet 

32from typing import Generic 

33from typing import Iterator 

34from typing import List 

35from typing import Mapping 

36from typing import NoReturn 

37from typing import Optional 

38from typing import overload 

39from typing import Sequence 

40from typing import Set 

41from typing import Tuple 

42from typing import Type 

43from typing import TYPE_CHECKING 

44from typing import TypeVar 

45from typing import Union 

46import warnings 

47 

48from . import _collections 

49from . import compat 

50from ._has_cy import HAS_CYEXTENSION 

51from .typing import Literal 

52from .. import exc 

53 

54_T = TypeVar("_T") 

55_T_co = TypeVar("_T_co", covariant=True) 

56_F = TypeVar("_F", bound=Callable[..., Any]) 

57_MP = TypeVar("_MP", bound="memoized_property[Any]") 

58_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]") 

59_HP = TypeVar("_HP", bound="hybridproperty[Any]") 

60_HM = TypeVar("_HM", bound="hybridmethod[Any]") 

61 

62 

63if compat.py314: 

64 # vendor a minimal form of get_annotations per 

65 # https://github.com/python/cpython/issues/133684#issuecomment-2863841891 

66 

67 from annotationlib import call_annotate_function # type: ignore 

68 from annotationlib import Format 

69 

70 def _get_and_call_annotate(obj, format): # noqa: A002 

71 annotate = getattr(obj, "__annotate__", None) 

72 if annotate is not None: 

73 ann = call_annotate_function(annotate, format, owner=obj) 

74 if not isinstance(ann, dict): 

75 raise ValueError(f"{obj!r}.__annotate__ returned a non-dict") 

76 return ann 

77 return None 

78 

79 # this is ported from py3.13.0a7 

80 _BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501 

81 

82 def _get_dunder_annotations(obj): 

83 if isinstance(obj, type): 

84 try: 

85 ann = _BASE_GET_ANNOTATIONS(obj) 

86 except AttributeError: 

87 # For static types, the descriptor raises AttributeError. 

88 return {} 

89 else: 

90 ann = getattr(obj, "__annotations__", None) 

91 if ann is None: 

92 return {} 

93 

94 if not isinstance(ann, dict): 

95 raise ValueError( 

96 f"{obj!r}.__annotations__ is neither a dict nor None" 

97 ) 

98 return dict(ann) 

99 

100 def _vendored_get_annotations( 

101 obj: Any, *, format: Format # noqa: A002 

102 ) -> Mapping[str, Any]: 

103 """A sparse implementation of annotationlib.get_annotations()""" 

104 

105 try: 

106 ann = _get_dunder_annotations(obj) 

107 except Exception: 

108 pass 

109 else: 

110 if ann is not None: 

111 return dict(ann) 

112 

113 # But if __annotations__ threw a NameError, we try calling __annotate__ 

114 ann = _get_and_call_annotate(obj, format) 

115 if ann is None: 

116 # If that didn't work either, we have a very weird object: 

117 # evaluating 

118 # __annotations__ threw NameError and there is no __annotate__. 

119 # In that case, 

120 # we fall back to trying __annotations__ again. 

121 ann = _get_dunder_annotations(obj) 

122 

123 if ann is None: 

124 if isinstance(obj, type) or callable(obj): 

125 return {} 

126 raise TypeError(f"{obj!r} does not have annotations") 

127 

128 if not ann: 

129 return {} 

130 

131 return dict(ann) 

132 

133 def get_annotations(obj: Any) -> Mapping[str, Any]: 

134 # FORWARDREF has the effect of giving us ForwardRefs and not 

135 # actually trying to evaluate the annotations. We need this so 

136 # that the annotations act as much like 

137 # "from __future__ import annotations" as possible, which is going 

138 # away in future python as a separate mode 

139 return _vendored_get_annotations(obj, format=Format.FORWARDREF) 

140 

141elif compat.py310: 

142 

143 def get_annotations(obj: Any) -> Mapping[str, Any]: 

144 return inspect.get_annotations(obj) 

145 

146else: 

147 

148 def get_annotations(obj: Any) -> Mapping[str, Any]: 

149 # it's been observed that cls.__annotations__ can be non present. 

150 # it's not clear what causes this, running under tox py37/38 it 

151 # happens, running straight pytest it doesnt 

152 

153 # https://docs.python.org/3/howto/annotations.html#annotations-howto 

154 if isinstance(obj, type): 

155 ann = obj.__dict__.get("__annotations__", None) 

156 else: 

157 ann = getattr(obj, "__annotations__", None) 

158 

159 if ann is None: 

160 return _collections.EMPTY_DICT 

161 else: 

162 return cast("Mapping[str, Any]", ann) 

163 

164 

165def md5_hex(x: Any) -> str: 

166 x = x.encode("utf-8") 

167 m = compat.md5_not_for_security() 

168 m.update(x) 

169 return cast(str, m.hexdigest()) 

170 

171 

172class safe_reraise: 

173 """Reraise an exception after invoking some 

174 handler code. 

175 

176 Stores the existing exception info before 

177 invoking so that it is maintained across a potential 

178 coroutine context switch. 

179 

180 e.g.:: 

181 

182 try: 

183 sess.commit() 

184 except: 

185 with safe_reraise(): 

186 sess.rollback() 

187 

188 TODO: we should at some point evaluate current behaviors in this regard 

189 based on current greenlet, gevent/eventlet implementations in Python 3, and 

190 also see the degree to which our own asyncio (based on greenlet also) is 

191 impacted by this. .rollback() will cause IO / context switch to occur in 

192 all these scenarios; what happens to the exception context from an 

193 "except:" block if we don't explicitly store it? Original issue was #2703. 

194 

195 """ 

196 

197 __slots__ = ("_exc_info",) 

198 

199 _exc_info: Union[ 

200 None, 

201 Tuple[ 

202 Type[BaseException], 

203 BaseException, 

204 types.TracebackType, 

205 ], 

206 Tuple[None, None, None], 

207 ] 

208 

209 def __enter__(self) -> None: 

210 self._exc_info = sys.exc_info() 

211 

212 def __exit__( 

213 self, 

214 type_: Optional[Type[BaseException]], 

215 value: Optional[BaseException], 

216 traceback: Optional[types.TracebackType], 

217 ) -> NoReturn: 

218 assert self._exc_info is not None 

219 # see #2703 for notes 

220 if type_ is None: 

221 exc_type, exc_value, exc_tb = self._exc_info 

222 assert exc_value is not None 

223 self._exc_info = None # remove potential circular references 

224 raise exc_value.with_traceback(exc_tb) 

225 else: 

226 self._exc_info = None # remove potential circular references 

227 assert value is not None 

228 raise value.with_traceback(traceback) 

229 

230 

231def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]: 

232 seen: Set[Any] = set() 

233 

234 stack = [cls] 

235 while stack: 

236 cls = stack.pop() 

237 if cls in seen: 

238 continue 

239 else: 

240 seen.add(cls) 

241 stack.extend(cls.__subclasses__()) 

242 yield cls 

243 

244 

245def string_or_unprintable(element: Any) -> str: 

246 if isinstance(element, str): 

247 return element 

248 else: 

249 try: 

250 return str(element) 

251 except Exception: 

252 return "unprintable element %r" % element 

253 

254 

255def clsname_as_plain_name( 

256 cls: Type[Any], use_name: Optional[str] = None 

257) -> str: 

258 name = use_name or cls.__name__ 

259 return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name)) 

260 

261 

262def method_is_overridden( 

263 instance_or_cls: Union[Type[Any], object], 

264 against_method: Callable[..., Any], 

265) -> bool: 

266 """Return True if the two class methods don't match.""" 

267 

268 if not isinstance(instance_or_cls, type): 

269 current_cls = instance_or_cls.__class__ 

270 else: 

271 current_cls = instance_or_cls 

272 

273 method_name = against_method.__name__ 

274 

275 current_method: types.MethodType = getattr(current_cls, method_name) 

276 

277 return current_method != against_method 

278 

279 

280def decode_slice(slc: slice) -> Tuple[Any, ...]: 

281 """decode a slice object as sent to __getitem__. 

282 

283 takes into account the 2.5 __index__() method, basically. 

284 

285 """ 

286 ret: List[Any] = [] 

287 for x in slc.start, slc.stop, slc.step: 

288 if hasattr(x, "__index__"): 

289 x = x.__index__() 

290 ret.append(x) 

291 return tuple(ret) 

292 

293 

294def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]: 

295 used_set = set(used) 

296 for base in bases: 

297 pool = itertools.chain( 

298 (base,), 

299 map(lambda i: base + str(i), range(1000)), 

300 ) 

301 for sym in pool: 

302 if sym not in used_set: 

303 used_set.add(sym) 

304 yield sym 

305 break 

306 else: 

307 raise NameError("exhausted namespace for symbol base %s" % base) 

308 

309 

310def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]: 

311 """Call the given function given each nonzero bit from n.""" 

312 

313 while n: 

314 b = n & (~n + 1) 

315 yield fn(b) 

316 n ^= b 

317 

318 

319_Fn = TypeVar("_Fn", bound="Callable[..., Any]") 

320 

321# this seems to be in flux in recent mypy versions 

322 

323 

324def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]: 

325 """A signature-matching decorator factory.""" 

326 

327 def decorate(fn: _Fn) -> _Fn: 

328 if not inspect.isfunction(fn) and not inspect.ismethod(fn): 

329 raise Exception("not a decoratable function") 

330 

331 # Python 3.14 defer creating __annotations__ until its used. 

332 # We do not want to create __annotations__ now. 

333 annofunc = getattr(fn, "__annotate__", None) 

334 if annofunc is not None: 

335 fn.__annotate__ = None # type: ignore[union-attr] 

336 try: 

337 spec = compat.inspect_getfullargspec(fn) 

338 finally: 

339 fn.__annotate__ = annofunc # type: ignore[union-attr] 

340 else: 

341 spec = compat.inspect_getfullargspec(fn) 

342 

343 # Do not generate code for annotations. 

344 # update_wrapper() copies the annotation from fn to decorated. 

345 # We use dummy defaults for code generation to avoid having 

346 # copy of large globals for compiling. 

347 # We copy __defaults__ and __kwdefaults__ from fn to decorated. 

348 empty_defaults = (None,) * len(spec.defaults or ()) 

349 empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ()) 

350 spec = spec._replace( 

351 annotations={}, 

352 defaults=empty_defaults, 

353 kwonlydefaults=empty_kwdefaults, 

354 ) 

355 

356 names = ( 

357 tuple(cast("Tuple[str, ...]", spec[0])) 

358 + cast("Tuple[str, ...]", spec[1:3]) 

359 + (fn.__name__,) 

360 ) 

361 targ_name, fn_name = _unique_symbols(names, "target", "fn") 

362 

363 metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name) 

364 metadata.update(format_argspec_plus(spec, grouped=False)) 

365 metadata["name"] = fn.__name__ 

366 

367 if inspect.iscoroutinefunction(fn): 

368 metadata["prefix"] = "async " 

369 metadata["target_prefix"] = "await " 

370 else: 

371 metadata["prefix"] = "" 

372 metadata["target_prefix"] = "" 

373 

374 # look for __ positional arguments. This is a convention in 

375 # SQLAlchemy that arguments should be passed positionally 

376 # rather than as keyword 

377 # arguments. note that apply_pos doesn't currently work in all cases 

378 # such as when a kw-only indicator "*" is present, which is why 

379 # we limit the use of this to just that case we can detect. As we add 

380 # more kinds of methods that use @decorator, things may have to 

381 # be further improved in this area 

382 if "__" in repr(spec[0]): 

383 code = ( 

384 """\ 

385%(prefix)sdef %(name)s%(grouped_args)s: 

386 return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s) 

387""" 

388 % metadata 

389 ) 

390 else: 

391 code = ( 

392 """\ 

393%(prefix)sdef %(name)s%(grouped_args)s: 

394 return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s) 

395""" 

396 % metadata 

397 ) 

398 

399 env: Dict[str, Any] = { 

400 targ_name: target, 

401 fn_name: fn, 

402 "__name__": fn.__module__, 

403 } 

404 

405 decorated = cast( 

406 types.FunctionType, 

407 _exec_code_in_env(code, env, fn.__name__), 

408 ) 

409 decorated.__defaults__ = fn.__defaults__ 

410 decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore 

411 return update_wrapper(decorated, fn) # type: ignore[return-value] 

412 

413 return update_wrapper(decorate, target) # type: ignore[return-value] 

414 

415 

416def _exec_code_in_env( 

417 code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str 

418) -> Callable[..., Any]: 

419 exec(code, env) 

420 return env[fn_name] # type: ignore[no-any-return] 

421 

422 

423_PF = TypeVar("_PF") 

424_TE = TypeVar("_TE") 

425 

426 

427class PluginLoader: 

428 def __init__( 

429 self, group: str, auto_fn: Optional[Callable[..., Any]] = None 

430 ): 

431 self.group = group 

432 self.impls: Dict[str, Any] = {} 

433 self.auto_fn = auto_fn 

434 

435 def clear(self): 

436 self.impls.clear() 

437 

438 def load(self, name: str) -> Any: 

439 if name in self.impls: 

440 return self.impls[name]() 

441 

442 if self.auto_fn: 

443 loader = self.auto_fn(name) 

444 if loader: 

445 self.impls[name] = loader 

446 return loader() 

447 

448 for impl in compat.importlib_metadata_get(self.group): 

449 if impl.name == name: 

450 self.impls[name] = impl.load 

451 return impl.load() 

452 

453 raise exc.NoSuchModuleError( 

454 "Can't load plugin: %s:%s" % (self.group, name) 

455 ) 

456 

457 def register(self, name: str, modulepath: str, objname: str) -> None: 

458 def load(): 

459 mod = __import__(modulepath) 

460 for token in modulepath.split(".")[1:]: 

461 mod = getattr(mod, token) 

462 return getattr(mod, objname) 

463 

464 self.impls[name] = load 

465 

466 def deregister(self, name: str) -> None: 

467 del self.impls[name] 

468 

469 

470def _inspect_func_args(fn): 

471 try: 

472 co_varkeywords = inspect.CO_VARKEYWORDS 

473 except AttributeError: 

474 # https://docs.python.org/3/library/inspect.html 

475 # The flags are specific to CPython, and may not be defined in other 

476 # Python implementations. Furthermore, the flags are an implementation 

477 # detail, and can be removed or deprecated in future Python releases. 

478 spec = compat.inspect_getfullargspec(fn) 

479 return spec[0], bool(spec[2]) 

480 else: 

481 # use fn.__code__ plus flags to reduce method call overhead 

482 co = fn.__code__ 

483 nargs = co.co_argcount 

484 return ( 

485 list(co.co_varnames[:nargs]), 

486 bool(co.co_flags & co_varkeywords), 

487 ) 

488 

489 

490@overload 

491def get_cls_kwargs( 

492 cls: type, 

493 *, 

494 _set: Optional[Set[str]] = None, 

495 raiseerr: Literal[True] = ..., 

496) -> Set[str]: ... 

497 

498 

499@overload 

500def get_cls_kwargs( 

501 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

502) -> Optional[Set[str]]: ... 

503 

504 

505def get_cls_kwargs( 

506 cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False 

507) -> Optional[Set[str]]: 

508 r"""Return the full set of inherited kwargs for the given `cls`. 

509 

510 Probes a class's __init__ method, collecting all named arguments. If the 

511 __init__ defines a \**kwargs catch-all, then the constructor is presumed 

512 to pass along unrecognized keywords to its base classes, and the 

513 collection process is repeated recursively on each of the bases. 

514 

515 Uses a subset of inspect.getfullargspec() to cut down on method overhead, 

516 as this is used within the Core typing system to create copies of type 

517 objects which is a performance-sensitive operation. 

518 

519 No anonymous tuple arguments please ! 

520 

521 """ 

522 toplevel = _set is None 

523 if toplevel: 

524 _set = set() 

525 assert _set is not None 

526 

527 ctr = cls.__dict__.get("__init__", False) 

528 

529 has_init = ( 

530 ctr 

531 and isinstance(ctr, types.FunctionType) 

532 and isinstance(ctr.__code__, types.CodeType) 

533 ) 

534 

535 if has_init: 

536 names, has_kw = _inspect_func_args(ctr) 

537 _set.update(names) 

538 

539 if not has_kw and not toplevel: 

540 if raiseerr: 

541 raise TypeError( 

542 f"given cls {cls} doesn't have an __init__ method" 

543 ) 

544 else: 

545 return None 

546 else: 

547 has_kw = False 

548 

549 if not has_init or has_kw: 

550 for c in cls.__bases__: 

551 if get_cls_kwargs(c, _set=_set) is None: 

552 break 

553 

554 _set.discard("self") 

555 return _set 

556 

557 

558def get_func_kwargs(func: Callable[..., Any]) -> List[str]: 

559 """Return the set of legal kwargs for the given `func`. 

560 

561 Uses getargspec so is safe to call for methods, functions, 

562 etc. 

563 

564 """ 

565 

566 return compat.inspect_getfullargspec(func)[0] 

567 

568 

569def get_callable_argspec( 

570 fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False 

571) -> compat.FullArgSpec: 

572 """Return the argument signature for any callable. 

573 

574 All pure-Python callables are accepted, including 

575 functions, methods, classes, objects with __call__; 

576 builtins and other edge cases like functools.partial() objects 

577 raise a TypeError. 

578 

579 """ 

580 if inspect.isbuiltin(fn): 

581 raise TypeError("Can't inspect builtin: %s" % fn) 

582 elif inspect.isfunction(fn): 

583 if _is_init and no_self: 

584 spec = compat.inspect_getfullargspec(fn) 

585 return compat.FullArgSpec( 

586 spec.args[1:], 

587 spec.varargs, 

588 spec.varkw, 

589 spec.defaults, 

590 spec.kwonlyargs, 

591 spec.kwonlydefaults, 

592 spec.annotations, 

593 ) 

594 else: 

595 return compat.inspect_getfullargspec(fn) 

596 elif inspect.ismethod(fn): 

597 if no_self and (_is_init or fn.__self__): 

598 spec = compat.inspect_getfullargspec(fn.__func__) 

599 return compat.FullArgSpec( 

600 spec.args[1:], 

601 spec.varargs, 

602 spec.varkw, 

603 spec.defaults, 

604 spec.kwonlyargs, 

605 spec.kwonlydefaults, 

606 spec.annotations, 

607 ) 

608 else: 

609 return compat.inspect_getfullargspec(fn.__func__) 

610 elif inspect.isclass(fn): 

611 return get_callable_argspec( 

612 fn.__init__, no_self=no_self, _is_init=True 

613 ) 

614 elif hasattr(fn, "__func__"): 

615 return compat.inspect_getfullargspec(fn.__func__) 

616 elif hasattr(fn, "__call__"): 

617 if inspect.ismethod(fn.__call__): 

618 return get_callable_argspec(fn.__call__, no_self=no_self) 

619 else: 

620 raise TypeError("Can't inspect callable: %s" % fn) 

621 else: 

622 raise TypeError("Can't inspect callable: %s" % fn) 

623 

624 

625def format_argspec_plus( 

626 fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True 

627) -> Dict[str, Optional[str]]: 

628 """Returns a dictionary of formatted, introspected function arguments. 

629 

630 A enhanced variant of inspect.formatargspec to support code generation. 

631 

632 fn 

633 An inspectable callable or tuple of inspect getargspec() results. 

634 grouped 

635 Defaults to True; include (parens, around, argument) lists 

636 

637 Returns: 

638 

639 args 

640 Full inspect.formatargspec for fn 

641 self_arg 

642 The name of the first positional argument, varargs[0], or None 

643 if the function defines no positional arguments. 

644 apply_pos 

645 args, re-written in calling rather than receiving syntax. Arguments are 

646 passed positionally. 

647 apply_kw 

648 Like apply_pos, except keyword-ish args are passed as keywords. 

649 apply_pos_proxied 

650 Like apply_pos but omits the self/cls argument 

651 

652 Example:: 

653 

654 >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123) 

655 {'grouped_args': '(self, a, b, c=3, **d)', 

656 'self_arg': 'self', 

657 'apply_kw': '(self, a, b, c=c, **d)', 

658 'apply_pos': '(self, a, b, c, **d)'} 

659 

660 """ 

661 if callable(fn): 

662 spec = compat.inspect_getfullargspec(fn) 

663 else: 

664 spec = fn 

665 

666 args = compat.inspect_formatargspec(*spec) 

667 

668 apply_pos = compat.inspect_formatargspec( 

669 spec[0], spec[1], spec[2], None, spec[4] 

670 ) 

671 

672 if spec[0]: 

673 self_arg = spec[0][0] 

674 

675 apply_pos_proxied = compat.inspect_formatargspec( 

676 spec[0][1:], spec[1], spec[2], None, spec[4] 

677 ) 

678 

679 elif spec[1]: 

680 # I'm not sure what this is 

681 self_arg = "%s[0]" % spec[1] 

682 

683 apply_pos_proxied = apply_pos 

684 else: 

685 self_arg = None 

686 apply_pos_proxied = apply_pos 

687 

688 num_defaults = 0 

689 if spec[3]: 

690 num_defaults += len(cast(Tuple[Any], spec[3])) 

691 if spec[4]: 

692 num_defaults += len(spec[4]) 

693 

694 name_args = spec[0] + spec[4] 

695 

696 defaulted_vals: Union[List[str], Tuple[()]] 

697 

698 if num_defaults: 

699 defaulted_vals = name_args[0 - num_defaults :] 

700 else: 

701 defaulted_vals = () 

702 

703 apply_kw = compat.inspect_formatargspec( 

704 name_args, 

705 spec[1], 

706 spec[2], 

707 defaulted_vals, 

708 formatvalue=lambda x: "=" + str(x), 

709 ) 

710 

711 if spec[0]: 

712 apply_kw_proxied = compat.inspect_formatargspec( 

713 name_args[1:], 

714 spec[1], 

715 spec[2], 

716 defaulted_vals, 

717 formatvalue=lambda x: "=" + str(x), 

718 ) 

719 else: 

720 apply_kw_proxied = apply_kw 

721 

722 if grouped: 

723 return dict( 

724 grouped_args=args, 

725 self_arg=self_arg, 

726 apply_pos=apply_pos, 

727 apply_kw=apply_kw, 

728 apply_pos_proxied=apply_pos_proxied, 

729 apply_kw_proxied=apply_kw_proxied, 

730 ) 

731 else: 

732 return dict( 

733 grouped_args=args, 

734 self_arg=self_arg, 

735 apply_pos=apply_pos[1:-1], 

736 apply_kw=apply_kw[1:-1], 

737 apply_pos_proxied=apply_pos_proxied[1:-1], 

738 apply_kw_proxied=apply_kw_proxied[1:-1], 

739 ) 

740 

741 

742def format_argspec_init(method, grouped=True): 

743 """format_argspec_plus with considerations for typical __init__ methods 

744 

745 Wraps format_argspec_plus with error handling strategies for typical 

746 __init__ cases: 

747 

748 .. sourcecode:: text 

749 

750 object.__init__ -> (self) 

751 other unreflectable (usually C) -> (self, *args, **kwargs) 

752 

753 """ 

754 if method is object.__init__: 

755 grouped_args = "(self)" 

756 args = "(self)" if grouped else "self" 

757 proxied = "()" if grouped else "" 

758 else: 

759 try: 

760 return format_argspec_plus(method, grouped=grouped) 

761 except TypeError: 

762 grouped_args = "(self, *args, **kwargs)" 

763 args = grouped_args if grouped else "self, *args, **kwargs" 

764 proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs" 

765 return dict( 

766 self_arg="self", 

767 grouped_args=grouped_args, 

768 apply_pos=args, 

769 apply_kw=args, 

770 apply_pos_proxied=proxied, 

771 apply_kw_proxied=proxied, 

772 ) 

773 

774 

775def create_proxy_methods( 

776 target_cls: Type[Any], 

777 target_cls_sphinx_name: str, 

778 proxy_cls_sphinx_name: str, 

779 classmethods: Sequence[str] = (), 

780 methods: Sequence[str] = (), 

781 attributes: Sequence[str] = (), 

782 use_intermediate_variable: Sequence[str] = (), 

783) -> Callable[[_T], _T]: 

784 """A class decorator indicating attributes should refer to a proxy 

785 class. 

786 

787 This decorator is now a "marker" that does nothing at runtime. Instead, 

788 it is consumed by the tools/generate_proxy_methods.py script to 

789 statically generate proxy methods and attributes that are fully 

790 recognized by typing tools such as mypy. 

791 

792 """ 

793 

794 def decorate(cls): 

795 return cls 

796 

797 return decorate 

798 

799 

800def getargspec_init(method): 

801 """inspect.getargspec with considerations for typical __init__ methods 

802 

803 Wraps inspect.getargspec with error handling for typical __init__ cases: 

804 

805 .. sourcecode:: text 

806 

807 object.__init__ -> (self) 

808 other unreflectable (usually C) -> (self, *args, **kwargs) 

809 

810 """ 

811 try: 

812 return compat.inspect_getfullargspec(method) 

813 except TypeError: 

814 if method is object.__init__: 

815 return (["self"], None, None, None) 

816 else: 

817 return (["self"], "args", "kwargs", None) 

818 

819 

820def unbound_method_to_callable(func_or_cls): 

821 """Adjust the incoming callable such that a 'self' argument is not 

822 required. 

823 

824 """ 

825 

826 if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__: 

827 return func_or_cls.__func__ 

828 else: 

829 return func_or_cls 

830 

831 

832def generic_repr( 

833 obj: Any, 

834 additional_kw: Sequence[Tuple[str, Any]] = (), 

835 to_inspect: Optional[Union[object, List[object]]] = None, 

836 omit_kwarg: Sequence[str] = (), 

837) -> str: 

838 """Produce a __repr__() based on direct association of the __init__() 

839 specification vs. same-named attributes present. 

840 

841 """ 

842 if to_inspect is None: 

843 to_inspect = [obj] 

844 else: 

845 to_inspect = _collections.to_list(to_inspect) 

846 

847 missing = object() 

848 

849 pos_args = [] 

850 kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict() 

851 vargs = None 

852 for i, insp in enumerate(to_inspect): 

853 try: 

854 spec = compat.inspect_getfullargspec(insp.__init__) 

855 except TypeError: 

856 continue 

857 else: 

858 default_len = len(spec.defaults) if spec.defaults else 0 

859 if i == 0: 

860 if spec.varargs: 

861 vargs = spec.varargs 

862 if default_len: 

863 pos_args.extend(spec.args[1:-default_len]) 

864 else: 

865 pos_args.extend(spec.args[1:]) 

866 else: 

867 kw_args.update( 

868 [(arg, missing) for arg in spec.args[1:-default_len]] 

869 ) 

870 

871 if default_len: 

872 assert spec.defaults 

873 kw_args.update( 

874 [ 

875 (arg, default) 

876 for arg, default in zip( 

877 spec.args[-default_len:], spec.defaults 

878 ) 

879 ] 

880 ) 

881 output: List[str] = [] 

882 

883 output.extend(repr(getattr(obj, arg, None)) for arg in pos_args) 

884 

885 if vargs is not None and hasattr(obj, vargs): 

886 output.extend([repr(val) for val in getattr(obj, vargs)]) 

887 

888 for arg, defval in kw_args.items(): 

889 if arg in omit_kwarg: 

890 continue 

891 try: 

892 val = getattr(obj, arg, missing) 

893 if val is not missing and val != defval: 

894 output.append("%s=%r" % (arg, val)) 

895 except Exception: 

896 pass 

897 

898 if additional_kw: 

899 for arg, defval in additional_kw: 

900 try: 

901 val = getattr(obj, arg, missing) 

902 if val is not missing and val != defval: 

903 output.append("%s=%r" % (arg, val)) 

904 except Exception: 

905 pass 

906 

907 return "%s(%s)" % (obj.__class__.__name__, ", ".join(output)) 

908 

909 

910class portable_instancemethod: 

911 """Turn an instancemethod into a (parent, name) pair 

912 to produce a serializable callable. 

913 

914 """ 

915 

916 __slots__ = "target", "name", "kwargs", "__weakref__" 

917 

918 def __getstate__(self): 

919 return { 

920 "target": self.target, 

921 "name": self.name, 

922 "kwargs": self.kwargs, 

923 } 

924 

925 def __setstate__(self, state): 

926 self.target = state["target"] 

927 self.name = state["name"] 

928 self.kwargs = state.get("kwargs", ()) 

929 

930 def __init__(self, meth, kwargs=()): 

931 self.target = meth.__self__ 

932 self.name = meth.__name__ 

933 self.kwargs = kwargs 

934 

935 def __call__(self, *arg, **kw): 

936 kw.update(self.kwargs) 

937 return getattr(self.target, self.name)(*arg, **kw) 

938 

939 

940def class_hierarchy(cls): 

941 """Return an unordered sequence of all classes related to cls. 

942 

943 Traverses diamond hierarchies. 

944 

945 Fibs slightly: subclasses of builtin types are not returned. Thus 

946 class_hierarchy(class A(object)) returns (A, object), not A plus every 

947 class systemwide that derives from object. 

948 

949 """ 

950 

951 hier = {cls} 

952 process = list(cls.__mro__) 

953 while process: 

954 c = process.pop() 

955 bases = (_ for _ in c.__bases__ if _ not in hier) 

956 

957 for b in bases: 

958 process.append(b) 

959 hier.add(b) 

960 

961 if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"): 

962 continue 

963 

964 for s in [ 

965 _ 

966 for _ in ( 

967 c.__subclasses__() 

968 if not issubclass(c, type) 

969 else c.__subclasses__(c) 

970 ) 

971 if _ not in hier 

972 ]: 

973 process.append(s) 

974 hier.add(s) 

975 return list(hier) 

976 

977 

978def iterate_attributes(cls): 

979 """iterate all the keys and attributes associated 

980 with a class, without using getattr(). 

981 

982 Does not use getattr() so that class-sensitive 

983 descriptors (i.e. property.__get__()) are not called. 

984 

985 """ 

986 keys = dir(cls) 

987 for key in keys: 

988 for c in cls.__mro__: 

989 if key in c.__dict__: 

990 yield (key, c.__dict__[key]) 

991 break 

992 

993 

994def monkeypatch_proxied_specials( 

995 into_cls, 

996 from_cls, 

997 skip=None, 

998 only=None, 

999 name="self.proxy", 

1000 from_instance=None, 

1001): 

1002 """Automates delegation of __specials__ for a proxying type.""" 

1003 

1004 if only: 

1005 dunders = only 

1006 else: 

1007 if skip is None: 

1008 skip = ( 

1009 "__slots__", 

1010 "__del__", 

1011 "__getattribute__", 

1012 "__metaclass__", 

1013 "__getstate__", 

1014 "__setstate__", 

1015 ) 

1016 dunders = [ 

1017 m 

1018 for m in dir(from_cls) 

1019 if ( 

1020 m.startswith("__") 

1021 and m.endswith("__") 

1022 and not hasattr(into_cls, m) 

1023 and m not in skip 

1024 ) 

1025 ] 

1026 

1027 for method in dunders: 

1028 try: 

1029 maybe_fn = getattr(from_cls, method) 

1030 if not hasattr(maybe_fn, "__call__"): 

1031 continue 

1032 maybe_fn = getattr(maybe_fn, "__func__", maybe_fn) 

1033 fn = cast(types.FunctionType, maybe_fn) 

1034 

1035 except AttributeError: 

1036 continue 

1037 try: 

1038 spec = compat.inspect_getfullargspec(fn) 

1039 fn_args = compat.inspect_formatargspec(spec[0]) 

1040 d_args = compat.inspect_formatargspec(spec[0][1:]) 

1041 except TypeError: 

1042 fn_args = "(self, *args, **kw)" 

1043 d_args = "(*args, **kw)" 

1044 

1045 py = ( 

1046 "def %(method)s%(fn_args)s: " 

1047 "return %(name)s.%(method)s%(d_args)s" % locals() 

1048 ) 

1049 

1050 env: Dict[str, types.FunctionType] = ( 

1051 from_instance is not None and {name: from_instance} or {} 

1052 ) 

1053 exec(py, env) 

1054 try: 

1055 env[method].__defaults__ = fn.__defaults__ 

1056 except AttributeError: 

1057 pass 

1058 setattr(into_cls, method, env[method]) 

1059 

1060 

1061def methods_equivalent(meth1, meth2): 

1062 """Return True if the two methods are the same implementation.""" 

1063 

1064 return getattr(meth1, "__func__", meth1) is getattr( 

1065 meth2, "__func__", meth2 

1066 ) 

1067 

1068 

1069def as_interface(obj, cls=None, methods=None, required=None): 

1070 """Ensure basic interface compliance for an instance or dict of callables. 

1071 

1072 Checks that ``obj`` implements public methods of ``cls`` or has members 

1073 listed in ``methods``. If ``required`` is not supplied, implementing at 

1074 least one interface method is sufficient. Methods present on ``obj`` that 

1075 are not in the interface are ignored. 

1076 

1077 If ``obj`` is a dict and ``dict`` does not meet the interface 

1078 requirements, the keys of the dictionary are inspected. Keys present in 

1079 ``obj`` that are not in the interface will raise TypeErrors. 

1080 

1081 Raises TypeError if ``obj`` does not meet the interface criteria. 

1082 

1083 In all passing cases, an object with callable members is returned. In the 

1084 simple case, ``obj`` is returned as-is; if dict processing kicks in then 

1085 an anonymous class is returned. 

1086 

1087 obj 

1088 A type, instance, or dictionary of callables. 

1089 cls 

1090 Optional, a type. All public methods of cls are considered the 

1091 interface. An ``obj`` instance of cls will always pass, ignoring 

1092 ``required``.. 

1093 methods 

1094 Optional, a sequence of method names to consider as the interface. 

1095 required 

1096 Optional, a sequence of mandatory implementations. If omitted, an 

1097 ``obj`` that provides at least one interface method is considered 

1098 sufficient. As a convenience, required may be a type, in which case 

1099 all public methods of the type are required. 

1100 

1101 """ 

1102 if not cls and not methods: 

1103 raise TypeError("a class or collection of method names are required") 

1104 

1105 if isinstance(cls, type) and isinstance(obj, cls): 

1106 return obj 

1107 

1108 interface = set(methods or [m for m in dir(cls) if not m.startswith("_")]) 

1109 implemented = set(dir(obj)) 

1110 

1111 complies = operator.ge 

1112 if isinstance(required, type): 

1113 required = interface 

1114 elif not required: 

1115 required = set() 

1116 complies = operator.gt 

1117 else: 

1118 required = set(required) 

1119 

1120 if complies(implemented.intersection(interface), required): 

1121 return obj 

1122 

1123 # No dict duck typing here. 

1124 if not isinstance(obj, dict): 

1125 qualifier = complies is operator.gt and "any of" or "all of" 

1126 raise TypeError( 

1127 "%r does not implement %s: %s" 

1128 % (obj, qualifier, ", ".join(interface)) 

1129 ) 

1130 

1131 class AnonymousInterface: 

1132 """A callable-holding shell.""" 

1133 

1134 if cls: 

1135 AnonymousInterface.__name__ = "Anonymous" + cls.__name__ 

1136 found = set() 

1137 

1138 for method, impl in dictlike_iteritems(obj): 

1139 if method not in interface: 

1140 raise TypeError("%r: unknown in this interface" % method) 

1141 if not callable(impl): 

1142 raise TypeError("%r=%r is not callable" % (method, impl)) 

1143 setattr(AnonymousInterface, method, staticmethod(impl)) 

1144 found.add(method) 

1145 

1146 if complies(found, required): 

1147 return AnonymousInterface 

1148 

1149 raise TypeError( 

1150 "dictionary does not contain required keys %s" 

1151 % ", ".join(required - found) 

1152 ) 

1153 

1154 

1155_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]") 

1156 

1157 

1158class generic_fn_descriptor(Generic[_T_co]): 

1159 """Descriptor which proxies a function when the attribute is not 

1160 present in dict 

1161 

1162 This superclass is organized in a particular way with "memoized" and 

1163 "non-memoized" implementation classes that are hidden from type checkers, 

1164 as Mypy seems to not be able to handle seeing multiple kinds of descriptor 

1165 classes used for the same attribute. 

1166 

1167 """ 

1168 

1169 fget: Callable[..., _T_co] 

1170 __doc__: Optional[str] 

1171 __name__: str 

1172 

1173 def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None): 

1174 self.fget = fget 

1175 self.__doc__ = doc or fget.__doc__ 

1176 self.__name__ = fget.__name__ 

1177 

1178 @overload 

1179 def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ... 

1180 

1181 @overload 

1182 def __get__(self, obj: object, cls: Any) -> _T_co: ... 

1183 

1184 def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]: 

1185 raise NotImplementedError() 

1186 

1187 if TYPE_CHECKING: 

1188 

1189 def __set__(self, instance: Any, value: Any) -> None: ... 

1190 

1191 def __delete__(self, instance: Any) -> None: ... 

1192 

1193 def _reset(self, obj: Any) -> None: 

1194 raise NotImplementedError() 

1195 

1196 @classmethod 

1197 def reset(cls, obj: Any, name: str) -> None: 

1198 raise NotImplementedError() 

1199 

1200 

1201class _non_memoized_property(generic_fn_descriptor[_T_co]): 

1202 """a plain descriptor that proxies a function. 

1203 

1204 primary rationale is to provide a plain attribute that's 

1205 compatible with memoized_property which is also recognized as equivalent 

1206 by mypy. 

1207 

1208 """ 

1209 

1210 if not TYPE_CHECKING: 

1211 

1212 def __get__(self, obj, cls): 

1213 if obj is None: 

1214 return self 

1215 return self.fget(obj) 

1216 

1217 

1218class _memoized_property(generic_fn_descriptor[_T_co]): 

1219 """A read-only @property that is only evaluated once.""" 

1220 

1221 if not TYPE_CHECKING: 

1222 

1223 def __get__(self, obj, cls): 

1224 if obj is None: 

1225 return self 

1226 obj.__dict__[self.__name__] = result = self.fget(obj) 

1227 return result 

1228 

1229 def _reset(self, obj): 

1230 _memoized_property.reset(obj, self.__name__) 

1231 

1232 @classmethod 

1233 def reset(cls, obj, name): 

1234 obj.__dict__.pop(name, None) 

1235 

1236 

1237# despite many attempts to get Mypy to recognize an overridden descriptor 

1238# where one is memoized and the other isn't, there seems to be no reliable 

1239# way other than completely deceiving the type checker into thinking there 

1240# is just one single descriptor type everywhere. Otherwise, if a superclass 

1241# has non-memoized and subclass has memoized, that requires 

1242# "class memoized(non_memoized)". but then if a superclass has memoized and 

1243# superclass has non-memoized, the class hierarchy of the descriptors 

1244# would need to be reversed; "class non_memoized(memoized)". so there's no 

1245# way to achieve this. 

1246# additional issues, RO properties: 

1247# https://github.com/python/mypy/issues/12440 

1248if TYPE_CHECKING: 

1249 # allow memoized and non-memoized to be freely mixed by having them 

1250 # be the same class 

1251 memoized_property = generic_fn_descriptor 

1252 non_memoized_property = generic_fn_descriptor 

1253 

1254 # for read only situations, mypy only sees @property as read only. 

1255 # read only is needed when a subtype specializes the return type 

1256 # of a property, meaning assignment needs to be disallowed 

1257 ro_memoized_property = property 

1258 ro_non_memoized_property = property 

1259 

1260else: 

1261 memoized_property = ro_memoized_property = _memoized_property 

1262 non_memoized_property = ro_non_memoized_property = _non_memoized_property 

1263 

1264 

1265def memoized_instancemethod(fn: _F) -> _F: 

1266 """Decorate a method memoize its return value. 

1267 

1268 Best applied to no-arg methods: memoization is not sensitive to 

1269 argument values, and will always return the same value even when 

1270 called with different arguments. 

1271 

1272 """ 

1273 

1274 def oneshot(self, *args, **kw): 

1275 result = fn(self, *args, **kw) 

1276 

1277 def memo(*a, **kw): 

1278 return result 

1279 

1280 memo.__name__ = fn.__name__ 

1281 memo.__doc__ = fn.__doc__ 

1282 self.__dict__[fn.__name__] = memo 

1283 return result 

1284 

1285 return update_wrapper(oneshot, fn) # type: ignore 

1286 

1287 

1288class HasMemoized: 

1289 """A mixin class that maintains the names of memoized elements in a 

1290 collection for easy cache clearing, generative, etc. 

1291 

1292 """ 

1293 

1294 if not TYPE_CHECKING: 

1295 # support classes that want to have __slots__ with an explicit 

1296 # slot for __dict__. not sure if that requires base __slots__ here. 

1297 __slots__ = () 

1298 

1299 _memoized_keys: FrozenSet[str] = frozenset() 

1300 

1301 def _reset_memoizations(self) -> None: 

1302 for elem in self._memoized_keys: 

1303 self.__dict__.pop(elem, None) 

1304 

1305 def _assert_no_memoizations(self) -> None: 

1306 for elem in self._memoized_keys: 

1307 assert elem not in self.__dict__ 

1308 

1309 def _set_memoized_attribute(self, key: str, value: Any) -> None: 

1310 self.__dict__[key] = value 

1311 self._memoized_keys |= {key} 

1312 

1313 class memoized_attribute(memoized_property[_T]): 

1314 """A read-only @property that is only evaluated once. 

1315 

1316 :meta private: 

1317 

1318 """ 

1319 

1320 fget: Callable[..., _T] 

1321 __doc__: Optional[str] 

1322 __name__: str 

1323 

1324 def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None): 

1325 self.fget = fget 

1326 self.__doc__ = doc or fget.__doc__ 

1327 self.__name__ = fget.__name__ 

1328 

1329 @overload 

1330 def __get__(self: _MA, obj: None, cls: Any) -> _MA: ... 

1331 

1332 @overload 

1333 def __get__(self, obj: Any, cls: Any) -> _T: ... 

1334 

1335 def __get__(self, obj, cls): 

1336 if obj is None: 

1337 return self 

1338 obj.__dict__[self.__name__] = result = self.fget(obj) 

1339 obj._memoized_keys |= {self.__name__} 

1340 return result 

1341 

1342 @classmethod 

1343 def memoized_instancemethod(cls, fn: _F) -> _F: 

1344 """Decorate a method memoize its return value. 

1345 

1346 :meta private: 

1347 

1348 """ 

1349 

1350 def oneshot(self: Any, *args: Any, **kw: Any) -> Any: 

1351 result = fn(self, *args, **kw) 

1352 

1353 def memo(*a, **kw): 

1354 return result 

1355 

1356 memo.__name__ = fn.__name__ 

1357 memo.__doc__ = fn.__doc__ 

1358 self.__dict__[fn.__name__] = memo 

1359 self._memoized_keys |= {fn.__name__} 

1360 return result 

1361 

1362 return update_wrapper(oneshot, fn) # type: ignore 

1363 

1364 

1365if TYPE_CHECKING: 

1366 HasMemoized_ro_memoized_attribute = property 

1367else: 

1368 HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute 

1369 

1370 

1371class MemoizedSlots: 

1372 """Apply memoized items to an object using a __getattr__ scheme. 

1373 

1374 This allows the functionality of memoized_property and 

1375 memoized_instancemethod to be available to a class using __slots__. 

1376 

1377 The memoized get is not threadsafe under freethreading and the 

1378 creator method may in extremely rare cases be called more than once. 

1379 

1380 """ 

1381 

1382 __slots__ = () 

1383 

1384 def _fallback_getattr(self, key): 

1385 raise AttributeError(key) 

1386 

1387 def __getattr__(self, key: str) -> Any: 

1388 if key.startswith("_memoized_attr_") or key.startswith( 

1389 "_memoized_method_" 

1390 ): 

1391 raise AttributeError(key) 

1392 # to avoid recursion errors when interacting with other __getattr__ 

1393 # schemes that refer to this one, when testing for memoized method 

1394 # look at __class__ only rather than going into __getattr__ again. 

1395 elif hasattr(self.__class__, f"_memoized_attr_{key}"): 

1396 value = getattr(self, f"_memoized_attr_{key}")() 

1397 setattr(self, key, value) 

1398 return value 

1399 elif hasattr(self.__class__, f"_memoized_method_{key}"): 

1400 meth = getattr(self, f"_memoized_method_{key}") 

1401 

1402 def oneshot(*args, **kw): 

1403 result = meth(*args, **kw) 

1404 

1405 def memo(*a, **kw): 

1406 return result 

1407 

1408 memo.__name__ = meth.__name__ 

1409 memo.__doc__ = meth.__doc__ 

1410 setattr(self, key, memo) 

1411 return result 

1412 

1413 oneshot.__doc__ = meth.__doc__ 

1414 return oneshot 

1415 else: 

1416 return self._fallback_getattr(key) 

1417 

1418 

1419# from paste.deploy.converters 

1420def asbool(obj: Any) -> bool: 

1421 if isinstance(obj, str): 

1422 obj = obj.strip().lower() 

1423 if obj in ["true", "yes", "on", "y", "t", "1"]: 

1424 return True 

1425 elif obj in ["false", "no", "off", "n", "f", "0"]: 

1426 return False 

1427 else: 

1428 raise ValueError("String is not true/false: %r" % obj) 

1429 return bool(obj) 

1430 

1431 

1432def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]: 

1433 """Return a callable that will evaluate a string as 

1434 boolean, or one of a set of "alternate" string values. 

1435 

1436 """ 

1437 

1438 def bool_or_value(obj: str) -> Union[str, bool]: 

1439 if obj in text: 

1440 return obj 

1441 else: 

1442 return asbool(obj) 

1443 

1444 return bool_or_value 

1445 

1446 

1447def asint(value: Any) -> Optional[int]: 

1448 """Coerce to integer.""" 

1449 

1450 if value is None: 

1451 return value 

1452 return int(value) 

1453 

1454 

1455def coerce_kw_type( 

1456 kw: Dict[str, Any], 

1457 key: str, 

1458 type_: Type[Any], 

1459 flexi_bool: bool = True, 

1460 dest: Optional[Dict[str, Any]] = None, 

1461) -> None: 

1462 r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if 

1463 necessary. If 'flexi_bool' is True, the string '0' is considered false 

1464 when coercing to boolean. 

1465 """ 

1466 

1467 if dest is None: 

1468 dest = kw 

1469 

1470 if ( 

1471 key in kw 

1472 and (not isinstance(type_, type) or not isinstance(kw[key], type_)) 

1473 and kw[key] is not None 

1474 ): 

1475 if type_ is bool and flexi_bool: 

1476 dest[key] = asbool(kw[key]) 

1477 else: 

1478 dest[key] = type_(kw[key]) 

1479 

1480 

1481def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]: 

1482 """Produce a tuple structure that is cacheable using the __dict__ of 

1483 obj to retrieve values 

1484 

1485 """ 

1486 names = get_cls_kwargs(cls) 

1487 return (cls,) + tuple( 

1488 (k, obj.__dict__[k]) for k in names if k in obj.__dict__ 

1489 ) 

1490 

1491 

1492def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T: 

1493 """Instantiate cls using the __dict__ of obj as constructor arguments. 

1494 

1495 Uses inspect to match the named arguments of ``cls``. 

1496 

1497 """ 

1498 

1499 names = get_cls_kwargs(cls) 

1500 kw.update( 

1501 (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__ 

1502 ) 

1503 return cls(*args, **kw) 

1504 

1505 

1506def counter() -> Callable[[], int]: 

1507 """Return a threadsafe counter function.""" 

1508 

1509 lock = threading.Lock() 

1510 counter = itertools.count(1) 

1511 

1512 # avoid the 2to3 "next" transformation... 

1513 def _next(): 

1514 with lock: 

1515 return next(counter) 

1516 

1517 return _next 

1518 

1519 

1520def duck_type_collection( 

1521 specimen: Any, default: Optional[Type[Any]] = None 

1522) -> Optional[Type[Any]]: 

1523 """Given an instance or class, guess if it is or is acting as one of 

1524 the basic collection types: list, set and dict. If the __emulates__ 

1525 property is present, return that preferentially. 

1526 """ 

1527 

1528 if hasattr(specimen, "__emulates__"): 

1529 # canonicalize set vs sets.Set to a standard: the builtin set 

1530 if specimen.__emulates__ is not None and issubclass( 

1531 specimen.__emulates__, set 

1532 ): 

1533 return set 

1534 else: 

1535 return specimen.__emulates__ # type: ignore 

1536 

1537 isa = issubclass if isinstance(specimen, type) else isinstance 

1538 if isa(specimen, list): 

1539 return list 

1540 elif isa(specimen, set): 

1541 return set 

1542 elif isa(specimen, dict): 

1543 return dict 

1544 

1545 if hasattr(specimen, "append"): 

1546 return list 

1547 elif hasattr(specimen, "add"): 

1548 return set 

1549 elif hasattr(specimen, "set"): 

1550 return dict 

1551 else: 

1552 return default 

1553 

1554 

1555def assert_arg_type( 

1556 arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str 

1557) -> Any: 

1558 if isinstance(arg, argtype): 

1559 return arg 

1560 else: 

1561 if isinstance(argtype, tuple): 

1562 raise exc.ArgumentError( 

1563 "Argument '%s' is expected to be one of type %s, got '%s'" 

1564 % (name, " or ".join("'%s'" % a for a in argtype), type(arg)) 

1565 ) 

1566 else: 

1567 raise exc.ArgumentError( 

1568 "Argument '%s' is expected to be of type '%s', got '%s'" 

1569 % (name, argtype, type(arg)) 

1570 ) 

1571 

1572 

1573def dictlike_iteritems(dictlike): 

1574 """Return a (key, value) iterator for almost any dict-like object.""" 

1575 

1576 if hasattr(dictlike, "items"): 

1577 return list(dictlike.items()) 

1578 

1579 getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None)) 

1580 if getter is None: 

1581 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1582 

1583 if hasattr(dictlike, "iterkeys"): 

1584 

1585 def iterator(): 

1586 for key in dictlike.iterkeys(): 

1587 assert getter is not None 

1588 yield key, getter(key) 

1589 

1590 return iterator() 

1591 elif hasattr(dictlike, "keys"): 

1592 return iter((key, getter(key)) for key in dictlike.keys()) 

1593 else: 

1594 raise TypeError("Object '%r' is not dict-like" % dictlike) 

1595 

1596 

1597class classproperty(property): 

1598 """A decorator that behaves like @property except that operates 

1599 on classes rather than instances. 

1600 

1601 The decorator is currently special when using the declarative 

1602 module, but note that the 

1603 :class:`~.sqlalchemy.ext.declarative.declared_attr` 

1604 decorator should be used for this purpose with declarative. 

1605 

1606 """ 

1607 

1608 fget: Callable[[Any], Any] 

1609 

1610 def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any): 

1611 super().__init__(fget, *arg, **kw) 

1612 self.__doc__ = fget.__doc__ 

1613 

1614 def __get__(self, obj: Any, cls: Optional[type] = None) -> Any: 

1615 return self.fget(cls) 

1616 

1617 

1618class hybridproperty(Generic[_T]): 

1619 def __init__(self, func: Callable[..., _T]): 

1620 self.func = func 

1621 self.clslevel = func 

1622 

1623 def __get__(self, instance: Any, owner: Any) -> _T: 

1624 if instance is None: 

1625 clsval = self.clslevel(owner) 

1626 return clsval 

1627 else: 

1628 return self.func(instance) 

1629 

1630 def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]: 

1631 self.clslevel = func 

1632 return self 

1633 

1634 

1635class rw_hybridproperty(Generic[_T]): 

1636 def __init__(self, func: Callable[..., _T]): 

1637 self.func = func 

1638 self.clslevel = func 

1639 self.setfn: Optional[Callable[..., Any]] = None 

1640 

1641 def __get__(self, instance: Any, owner: Any) -> _T: 

1642 if instance is None: 

1643 clsval = self.clslevel(owner) 

1644 return clsval 

1645 else: 

1646 return self.func(instance) 

1647 

1648 def __set__(self, instance: Any, value: Any) -> None: 

1649 assert self.setfn is not None 

1650 self.setfn(instance, value) 

1651 

1652 def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1653 self.setfn = func 

1654 return self 

1655 

1656 def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]: 

1657 self.clslevel = func 

1658 return self 

1659 

1660 

1661class hybridmethod(Generic[_T]): 

1662 """Decorate a function as cls- or instance- level.""" 

1663 

1664 def __init__(self, func: Callable[..., _T]): 

1665 self.func = self.__func__ = func 

1666 self.clslevel = func 

1667 

1668 def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]: 

1669 if instance is None: 

1670 return self.clslevel.__get__(owner, owner.__class__) # type:ignore 

1671 else: 

1672 return self.func.__get__(instance, owner) # type:ignore 

1673 

1674 def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]: 

1675 self.clslevel = func 

1676 return self 

1677 

1678 

1679class symbol(int): 

1680 """A constant symbol. 

1681 

1682 >>> symbol("foo") is symbol("foo") 

1683 True 

1684 >>> symbol("foo") 

1685 <symbol 'foo> 

1686 

1687 A slight refinement of the MAGICCOOKIE=object() pattern. The primary 

1688 advantage of symbol() is its repr(). They are also singletons. 

1689 

1690 Repeated calls of symbol('name') will all return the same instance. 

1691 

1692 """ 

1693 

1694 name: str 

1695 

1696 symbols: Dict[str, symbol] = {} 

1697 _lock = threading.Lock() 

1698 

1699 def __new__( 

1700 cls, 

1701 name: str, 

1702 doc: Optional[str] = None, 

1703 canonical: Optional[int] = None, 

1704 ) -> symbol: 

1705 with cls._lock: 

1706 sym = cls.symbols.get(name) 

1707 if sym is None: 

1708 assert isinstance(name, str) 

1709 if canonical is None: 

1710 canonical = hash(name) 

1711 sym = int.__new__(symbol, canonical) 

1712 sym.name = name 

1713 if doc: 

1714 sym.__doc__ = doc 

1715 

1716 # NOTE: we should ultimately get rid of this global thing, 

1717 # however, currently it is to support pickling. The best 

1718 # change would be when we are on py3.11 at a minimum, we 

1719 # switch to stdlib enum.IntFlag. 

1720 cls.symbols[name] = sym 

1721 else: 

1722 if canonical and canonical != sym: 

1723 raise TypeError( 

1724 f"Can't replace canonical symbol for {name!r} " 

1725 f"with new int value {canonical}" 

1726 ) 

1727 return sym 

1728 

1729 def __reduce__(self): 

1730 return symbol, (self.name, "x", int(self)) 

1731 

1732 def __str__(self): 

1733 return repr(self) 

1734 

1735 def __repr__(self): 

1736 return f"symbol({self.name!r})" 

1737 

1738 

1739class _IntFlagMeta(type): 

1740 def __init__( 

1741 cls, 

1742 classname: str, 

1743 bases: Tuple[Type[Any], ...], 

1744 dict_: Dict[str, Any], 

1745 **kw: Any, 

1746 ) -> None: 

1747 items: List[symbol] 

1748 cls._items = items = [] 

1749 for k, v in dict_.items(): 

1750 if re.match(r"^__.*__$", k): 

1751 continue 

1752 if isinstance(v, int): 

1753 sym = symbol(k, canonical=v) 

1754 elif not k.startswith("_"): 

1755 raise TypeError("Expected integer values for IntFlag") 

1756 else: 

1757 continue 

1758 setattr(cls, k, sym) 

1759 items.append(sym) 

1760 

1761 cls.__members__ = _collections.immutabledict( 

1762 {sym.name: sym for sym in items} 

1763 ) 

1764 

1765 def __iter__(self) -> Iterator[symbol]: 

1766 raise NotImplementedError( 

1767 "iter not implemented to ensure compatibility with " 

1768 "Python 3.11 IntFlag. Please use __members__. See " 

1769 "https://github.com/python/cpython/issues/99304" 

1770 ) 

1771 

1772 

1773class _FastIntFlag(metaclass=_IntFlagMeta): 

1774 """An 'IntFlag' copycat that isn't slow when performing bitwise 

1775 operations. 

1776 

1777 the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING 

1778 and ``_FastIntFlag`` otherwise. 

1779 

1780 """ 

1781 

1782 

1783if TYPE_CHECKING: 

1784 from enum import IntFlag 

1785 

1786 FastIntFlag = IntFlag 

1787else: 

1788 FastIntFlag = _FastIntFlag 

1789 

1790 

1791_E = TypeVar("_E", bound=enum.Enum) 

1792 

1793 

1794def parse_user_argument_for_enum( 

1795 arg: Any, 

1796 choices: Dict[_E, List[Any]], 

1797 name: str, 

1798 resolve_symbol_names: bool = False, 

1799) -> Optional[_E]: 

1800 """Given a user parameter, parse the parameter into a chosen value 

1801 from a list of choice objects, typically Enum values. 

1802 

1803 The user argument can be a string name that matches the name of a 

1804 symbol, or the symbol object itself, or any number of alternate choices 

1805 such as True/False/ None etc. 

1806 

1807 :param arg: the user argument. 

1808 :param choices: dictionary of enum values to lists of possible 

1809 entries for each. 

1810 :param name: name of the argument. Used in an :class:`.ArgumentError` 

1811 that is raised if the parameter doesn't match any available argument. 

1812 

1813 """ 

1814 for enum_value, choice in choices.items(): 

1815 if arg is enum_value: 

1816 return enum_value 

1817 elif resolve_symbol_names and arg == enum_value.name: 

1818 return enum_value 

1819 elif arg in choice: 

1820 return enum_value 

1821 

1822 if arg is None: 

1823 return None 

1824 

1825 raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}") 

1826 

1827 

1828_creation_order = 1 

1829 

1830 

1831def set_creation_order(instance: Any) -> None: 

1832 """Assign a '_creation_order' sequence to the given instance. 

1833 

1834 This allows multiple instances to be sorted in order of creation 

1835 (typically within a single thread; the counter is not particularly 

1836 threadsafe). 

1837 

1838 """ 

1839 global _creation_order 

1840 instance._creation_order = _creation_order 

1841 _creation_order += 1 

1842 

1843 

1844def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

1845 """executes the given function, catches all exceptions and converts to 

1846 a warning. 

1847 

1848 """ 

1849 try: 

1850 return func(*args, **kwargs) 

1851 except Exception: 

1852 warn("%s('%s') ignored" % sys.exc_info()[0:2]) 

1853 

1854 

1855def ellipses_string(value, len_=25): 

1856 try: 

1857 if len(value) > len_: 

1858 return "%s..." % value[0:len_] 

1859 else: 

1860 return value 

1861 except TypeError: 

1862 return value 

1863 

1864 

1865class _hash_limit_string(str): 

1866 """A string subclass that can only be hashed on a maximum amount 

1867 of unique values. 

1868 

1869 This is used for warnings so that we can send out parameterized warnings 

1870 without the __warningregistry__ of the module, or the non-overridable 

1871 "once" registry within warnings.py, overloading memory, 

1872 

1873 

1874 """ 

1875 

1876 _hash: int 

1877 

1878 def __new__( 

1879 cls, value: str, num: int, args: Sequence[Any] 

1880 ) -> _hash_limit_string: 

1881 interpolated = (value % args) + ( 

1882 " (this warning may be suppressed after %d occurrences)" % num 

1883 ) 

1884 self = super().__new__(cls, interpolated) 

1885 self._hash = hash("%s_%d" % (value, hash(interpolated) % num)) 

1886 return self 

1887 

1888 def __hash__(self) -> int: 

1889 return self._hash 

1890 

1891 def __eq__(self, other: Any) -> bool: 

1892 return hash(self) == hash(other) 

1893 

1894 

1895def warn(msg: str, code: Optional[str] = None) -> None: 

1896 """Issue a warning. 

1897 

1898 If msg is a string, :class:`.exc.SAWarning` is used as 

1899 the category. 

1900 

1901 """ 

1902 if code: 

1903 _warnings_warn(exc.SAWarning(msg, code=code)) 

1904 else: 

1905 _warnings_warn(msg, exc.SAWarning) 

1906 

1907 

1908def warn_limited(msg: str, args: Sequence[Any]) -> None: 

1909 """Issue a warning with a parameterized string, limiting the number 

1910 of registrations. 

1911 

1912 """ 

1913 if args: 

1914 msg = _hash_limit_string(msg, 10, args) 

1915 _warnings_warn(msg, exc.SAWarning) 

1916 

1917 

1918_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {} 

1919 

1920 

1921def tag_method_for_warnings( 

1922 message: str, category: Type[Warning] 

1923) -> Callable[[_F], _F]: 

1924 def go(fn): 

1925 _warning_tags[fn.__code__] = (message, category) 

1926 return fn 

1927 

1928 return go 

1929 

1930 

1931_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)") 

1932 

1933 

1934def _warnings_warn( 

1935 message: Union[str, Warning], 

1936 category: Optional[Type[Warning]] = None, 

1937 stacklevel: int = 2, 

1938) -> None: 

1939 # adjust the given stacklevel to be outside of SQLAlchemy 

1940 try: 

1941 frame = sys._getframe(stacklevel) 

1942 except ValueError: 

1943 # being called from less than 3 (or given) stacklevels, weird, 

1944 # but don't crash 

1945 stacklevel = 0 

1946 except: 

1947 # _getframe() doesn't work, weird interpreter issue, weird, 

1948 # ok, but don't crash 

1949 stacklevel = 0 

1950 else: 

1951 stacklevel_found = warning_tag_found = False 

1952 while frame is not None: 

1953 # using __name__ here requires that we have __name__ in the 

1954 # __globals__ of the decorated string functions we make also. 

1955 # we generate this using {"__name__": fn.__module__} 

1956 if not stacklevel_found and not re.match( 

1957 _not_sa_pattern, frame.f_globals.get("__name__", "") 

1958 ): 

1959 # stop incrementing stack level if an out-of-SQLA line 

1960 # were found. 

1961 stacklevel_found = True 

1962 

1963 # however, for the warning tag thing, we have to keep 

1964 # scanning up the whole traceback 

1965 

1966 if frame.f_code in _warning_tags: 

1967 warning_tag_found = True 

1968 (_suffix, _category) = _warning_tags[frame.f_code] 

1969 category = category or _category 

1970 message = f"{message} ({_suffix})" 

1971 

1972 frame = frame.f_back # type: ignore[assignment] 

1973 

1974 if not stacklevel_found: 

1975 stacklevel += 1 

1976 elif stacklevel_found and warning_tag_found: 

1977 break 

1978 

1979 if category is not None: 

1980 warnings.warn(message, category, stacklevel=stacklevel + 1) 

1981 else: 

1982 warnings.warn(message, stacklevel=stacklevel + 1) 

1983 

1984 

1985def only_once( 

1986 fn: Callable[..., _T], retry_on_exception: bool 

1987) -> Callable[..., Optional[_T]]: 

1988 """Decorate the given function to be a no-op after it is called exactly 

1989 once.""" 

1990 

1991 once = [fn] 

1992 

1993 def go(*arg: Any, **kw: Any) -> Optional[_T]: 

1994 # strong reference fn so that it isn't garbage collected, 

1995 # which interferes with the event system's expectations 

1996 strong_fn = fn # noqa 

1997 if once: 

1998 once_fn = once.pop() 

1999 try: 

2000 return once_fn(*arg, **kw) 

2001 except: 

2002 if retry_on_exception: 

2003 once.insert(0, once_fn) 

2004 raise 

2005 

2006 return None 

2007 

2008 return go 

2009 

2010 

2011_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py") 

2012_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)") 

2013 

2014 

2015def chop_traceback( 

2016 tb: List[str], 

2017 exclude_prefix: re.Pattern[str] = _UNITTEST_RE, 

2018 exclude_suffix: re.Pattern[str] = _SQLA_RE, 

2019) -> List[str]: 

2020 """Chop extraneous lines off beginning and end of a traceback. 

2021 

2022 :param tb: 

2023 a list of traceback lines as returned by ``traceback.format_stack()`` 

2024 

2025 :param exclude_prefix: 

2026 a regular expression object matching lines to skip at beginning of 

2027 ``tb`` 

2028 

2029 :param exclude_suffix: 

2030 a regular expression object matching lines to skip at end of ``tb`` 

2031 """ 

2032 start = 0 

2033 end = len(tb) - 1 

2034 while start <= end and exclude_prefix.search(tb[start]): 

2035 start += 1 

2036 while start <= end and exclude_suffix.search(tb[end]): 

2037 end -= 1 

2038 return tb[start : end + 1] 

2039 

2040 

2041NoneType = type(None) 

2042 

2043 

2044def attrsetter(attrname): 

2045 code = "def set(obj, value): obj.%s = value" % attrname 

2046 env = locals().copy() 

2047 exec(code, env) 

2048 return env["set"] 

2049 

2050 

2051_dunders = re.compile("^__.+__$") 

2052 

2053 

2054class TypingOnly: 

2055 """A mixin class that marks a class as 'typing only', meaning it has 

2056 absolutely no methods, attributes, or runtime functionality whatsoever. 

2057 

2058 """ 

2059 

2060 __slots__ = () 

2061 

2062 def __init_subclass__(cls) -> None: 

2063 if TypingOnly in cls.__bases__: 

2064 remaining = { 

2065 name for name in cls.__dict__ if not _dunders.match(name) 

2066 } 

2067 if remaining: 

2068 raise AssertionError( 

2069 f"Class {cls} directly inherits TypingOnly but has " 

2070 f"additional attributes {remaining}." 

2071 ) 

2072 super().__init_subclass__() 

2073 

2074 

2075class EnsureKWArg: 

2076 r"""Apply translation of functions to accept \**kw arguments if they 

2077 don't already. 

2078 

2079 Used to ensure cross-compatibility with third party legacy code, for things 

2080 like compiler visit methods that need to accept ``**kw`` arguments, 

2081 but may have been copied from old code that didn't accept them. 

2082 

2083 """ 

2084 

2085 ensure_kwarg: str 

2086 """a regular expression that indicates method names for which the method 

2087 should accept ``**kw`` arguments. 

2088 

2089 The class will scan for methods matching the name template and decorate 

2090 them if necessary to ensure ``**kw`` parameters are accepted. 

2091 

2092 """ 

2093 

2094 def __init_subclass__(cls) -> None: 

2095 fn_reg = cls.ensure_kwarg 

2096 clsdict = cls.__dict__ 

2097 if fn_reg: 

2098 for key in clsdict: 

2099 m = re.match(fn_reg, key) 

2100 if m: 

2101 fn = clsdict[key] 

2102 spec = compat.inspect_getfullargspec(fn) 

2103 if not spec.varkw: 

2104 wrapped = cls._wrap_w_kw(fn) 

2105 setattr(cls, key, wrapped) 

2106 super().__init_subclass__() 

2107 

2108 @classmethod 

2109 def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]: 

2110 def wrap(*arg: Any, **kw: Any) -> Any: 

2111 return fn(*arg) 

2112 

2113 return update_wrapper(wrap, fn) 

2114 

2115 

2116def wrap_callable(wrapper, fn): 

2117 """Augment functools.update_wrapper() to work with objects with 

2118 a ``__call__()`` method. 

2119 

2120 :param fn: 

2121 object with __call__ method 

2122 

2123 """ 

2124 if hasattr(fn, "__name__"): 

2125 return update_wrapper(wrapper, fn) 

2126 else: 

2127 _f = wrapper 

2128 _f.__name__ = fn.__class__.__name__ 

2129 if hasattr(fn, "__module__"): 

2130 _f.__module__ = fn.__module__ 

2131 

2132 if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__: 

2133 _f.__doc__ = fn.__call__.__doc__ 

2134 elif fn.__doc__: 

2135 _f.__doc__ = fn.__doc__ 

2136 

2137 return _f 

2138 

2139 

2140def quoted_token_parser(value): 

2141 """Parse a dotted identifier with accommodation for quoted names. 

2142 

2143 Includes support for SQL-style double quotes as a literal character. 

2144 

2145 E.g.:: 

2146 

2147 >>> quoted_token_parser("name") 

2148 ["name"] 

2149 >>> quoted_token_parser("schema.name") 

2150 ["schema", "name"] 

2151 >>> quoted_token_parser('"Schema"."Name"') 

2152 ['Schema', 'Name'] 

2153 >>> quoted_token_parser('"Schema"."Name""Foo"') 

2154 ['Schema', 'Name""Foo'] 

2155 

2156 """ 

2157 

2158 if '"' not in value: 

2159 return value.split(".") 

2160 

2161 # 0 = outside of quotes 

2162 # 1 = inside of quotes 

2163 state = 0 

2164 result: List[List[str]] = [[]] 

2165 idx = 0 

2166 lv = len(value) 

2167 while idx < lv: 

2168 char = value[idx] 

2169 if char == '"': 

2170 if state == 1 and idx < lv - 1 and value[idx + 1] == '"': 

2171 result[-1].append('"') 

2172 idx += 1 

2173 else: 

2174 state ^= 1 

2175 elif char == "." and state == 0: 

2176 result.append([]) 

2177 else: 

2178 result[-1].append(char) 

2179 idx += 1 

2180 

2181 return ["".join(token) for token in result] 

2182 

2183 

2184def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]: 

2185 params = _collections.to_list(params) 

2186 

2187 def decorate(fn): 

2188 doc = fn.__doc__ is not None and fn.__doc__ or "" 

2189 if doc: 

2190 doc = inject_param_text(doc, {param: text for param in params}) 

2191 fn.__doc__ = doc 

2192 return fn 

2193 

2194 return decorate 

2195 

2196 

2197def _dedent_docstring(text: str) -> str: 

2198 split_text = text.split("\n", 1) 

2199 if len(split_text) == 1: 

2200 return text 

2201 else: 

2202 firstline, remaining = split_text 

2203 if not firstline.startswith(" "): 

2204 return firstline + "\n" + textwrap.dedent(remaining) 

2205 else: 

2206 return textwrap.dedent(text) 

2207 

2208 

2209def inject_docstring_text( 

2210 given_doctext: Optional[str], injecttext: str, pos: int 

2211) -> str: 

2212 doctext: str = _dedent_docstring(given_doctext or "") 

2213 lines = doctext.split("\n") 

2214 if len(lines) == 1: 

2215 lines.append("") 

2216 injectlines = textwrap.dedent(injecttext).split("\n") 

2217 if injectlines[0]: 

2218 injectlines.insert(0, "") 

2219 

2220 blanks = [num for num, line in enumerate(lines) if not line.strip()] 

2221 blanks.insert(0, 0) 

2222 

2223 inject_pos = blanks[min(pos, len(blanks) - 1)] 

2224 

2225 lines = lines[0:inject_pos] + injectlines + lines[inject_pos:] 

2226 return "\n".join(lines) 

2227 

2228 

2229_param_reg = re.compile(r"(\s+):param (.+?):") 

2230 

2231 

2232def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str: 

2233 doclines = collections.deque(doctext.splitlines()) 

2234 lines = [] 

2235 

2236 # TODO: this is not working for params like ":param case_sensitive=True:" 

2237 

2238 to_inject = None 

2239 while doclines: 

2240 line = doclines.popleft() 

2241 

2242 m = _param_reg.match(line) 

2243 

2244 if to_inject is None: 

2245 if m: 

2246 param = m.group(2).lstrip("*") 

2247 if param in inject_params: 

2248 # default indent to that of :param: plus one 

2249 indent = " " * len(m.group(1)) + " " 

2250 

2251 # but if the next line has text, use that line's 

2252 # indentation 

2253 if doclines: 

2254 m2 = re.match(r"(\s+)\S", doclines[0]) 

2255 if m2: 

2256 indent = " " * len(m2.group(1)) 

2257 

2258 to_inject = indent + inject_params[param] 

2259 elif m: 

2260 lines.extend(["\n", to_inject, "\n"]) 

2261 to_inject = None 

2262 elif not line.rstrip(): 

2263 lines.extend([line, to_inject, "\n"]) 

2264 to_inject = None 

2265 elif line.endswith("::"): 

2266 # TODO: this still won't cover if the code example itself has 

2267 # blank lines in it, need to detect those via indentation. 

2268 lines.extend([line, doclines.popleft()]) 

2269 continue 

2270 lines.append(line) 

2271 

2272 return "\n".join(lines) 

2273 

2274 

2275def repr_tuple_names(names: List[str]) -> Optional[str]: 

2276 """Trims a list of strings from the middle and return a string of up to 

2277 four elements. Strings greater than 11 characters will be truncated""" 

2278 if len(names) == 0: 

2279 return None 

2280 flag = len(names) <= 4 

2281 names = names[0:4] if flag else names[0:3] + names[-1:] 

2282 res = ["%s.." % name[:11] if len(name) > 11 else name for name in names] 

2283 if flag: 

2284 return ", ".join(res) 

2285 else: 

2286 return "%s, ..., %s" % (", ".join(res[0:3]), res[-1]) 

2287 

2288 

2289def has_compiled_ext(raise_=False): 

2290 if HAS_CYEXTENSION: 

2291 return True 

2292 elif raise_: 

2293 raise ImportError( 

2294 "cython extensions were expected to be installed, " 

2295 "but are not present" 

2296 ) 

2297 else: 

2298 return False 

2299 

2300 

2301class _Missing(enum.Enum): 

2302 Missing = enum.auto() 

2303 

2304 

2305Missing = _Missing.Missing 

2306MissingOr = Union[_T, Literal[_Missing.Missing]]