Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/prompt_toolkit/utils.py: 37%

122 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1from __future__ import annotations 

2 

3import os 

4import signal 

5import sys 

6import threading 

7from collections import deque 

8from typing import ( 

9 Callable, 

10 ContextManager, 

11 Deque, 

12 Dict, 

13 Generator, 

14 Generic, 

15 TypeVar, 

16 Union, 

17) 

18 

19from wcwidth import wcwidth 

20 

21__all__ = [ 

22 "Event", 

23 "DummyContext", 

24 "get_cwidth", 

25 "suspend_to_background_supported", 

26 "is_conemu_ansi", 

27 "is_windows", 

28 "in_main_thread", 

29 "get_bell_environment_variable", 

30 "get_term_environment_variable", 

31 "take_using_weights", 

32 "to_str", 

33 "to_int", 

34 "AnyFloat", 

35 "to_float", 

36 "is_dumb_terminal", 

37] 

38 

39# Used to ensure sphinx autodoc does not try to import platform-specific 

40# stuff when documenting win32.py modules. 

41SPHINX_AUTODOC_RUNNING = "sphinx.ext.autodoc" in sys.modules 

42 

43_Sender = TypeVar("_Sender", covariant=True) 

44 

45 

46class Event(Generic[_Sender]): 

47 """ 

48 Simple event to which event handlers can be attached. For instance:: 

49 

50 class Cls: 

51 def __init__(self): 

52 # Define event. The first parameter is the sender. 

53 self.event = Event(self) 

54 

55 obj = Cls() 

56 

57 def handler(sender): 

58 pass 

59 

60 # Add event handler by using the += operator. 

61 obj.event += handler 

62 

63 # Fire event. 

64 obj.event() 

65 """ 

66 

67 def __init__( 

68 self, sender: _Sender, handler: Callable[[_Sender], None] | None = None 

69 ) -> None: 

70 self.sender = sender 

71 self._handlers: list[Callable[[_Sender], None]] = [] 

72 

73 if handler is not None: 

74 self += handler 

75 

76 def __call__(self) -> None: 

77 "Fire event." 

78 for handler in self._handlers: 

79 handler(self.sender) 

80 

81 def fire(self) -> None: 

82 "Alias for just calling the event." 

83 self() 

84 

85 def add_handler(self, handler: Callable[[_Sender], None]) -> None: 

86 """ 

87 Add another handler to this callback. 

88 (Handler should be a callable that takes exactly one parameter: the 

89 sender object.) 

90 """ 

91 # Add to list of event handlers. 

92 self._handlers.append(handler) 

93 

94 def remove_handler(self, handler: Callable[[_Sender], None]) -> None: 

95 """ 

96 Remove a handler from this callback. 

97 """ 

98 if handler in self._handlers: 

99 self._handlers.remove(handler) 

100 

101 def __iadd__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

102 """ 

103 `event += handler` notation for adding a handler. 

104 """ 

105 self.add_handler(handler) 

106 return self 

107 

108 def __isub__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

109 """ 

110 `event -= handler` notation for removing a handler. 

111 """ 

112 self.remove_handler(handler) 

113 return self 

114 

115 

116class DummyContext(ContextManager[None]): 

117 """ 

118 (contextlib.nested is not available on Py3) 

119 """ 

120 

121 def __enter__(self) -> None: 

122 pass 

123 

124 def __exit__(self, *a: object) -> None: 

125 pass 

126 

127 

128class _CharSizesCache(Dict[str, int]): 

129 """ 

130 Cache for wcwidth sizes. 

131 """ 

132 

133 LONG_STRING_MIN_LEN = 64 # Minimum string length for considering it long. 

134 MAX_LONG_STRINGS = 16 # Maximum number of long strings to remember. 

135 

136 def __init__(self) -> None: 

137 super().__init__() 

138 # Keep track of the "long" strings in this cache. 

139 self._long_strings: Deque[str] = deque() 

140 

141 def __missing__(self, string: str) -> int: 

142 # Note: We use the `max(0, ...` because some non printable control 

143 # characters, like e.g. Ctrl-underscore get a -1 wcwidth value. 

144 # It can be possible that these characters end up in the input 

145 # text. 

146 result: int 

147 if len(string) == 1: 

148 result = max(0, wcwidth(string)) 

149 else: 

150 result = sum(self[c] for c in string) 

151 

152 # Store in cache. 

153 self[string] = result 

154 

155 # Rotate long strings. 

156 # (It's hard to tell what we can consider short...) 

157 if len(string) > self.LONG_STRING_MIN_LEN: 

158 long_strings = self._long_strings 

159 long_strings.append(string) 

160 

161 if len(long_strings) > self.MAX_LONG_STRINGS: 

162 key_to_remove = long_strings.popleft() 

163 if key_to_remove in self: 

164 del self[key_to_remove] 

165 

166 return result 

167 

168 

169_CHAR_SIZES_CACHE = _CharSizesCache() 

170 

171 

172def get_cwidth(string: str) -> int: 

173 """ 

174 Return width of a string. Wrapper around ``wcwidth``. 

175 """ 

176 return _CHAR_SIZES_CACHE[string] 

177 

178 

179def suspend_to_background_supported() -> bool: 

180 """ 

181 Returns `True` when the Python implementation supports 

182 suspend-to-background. This is typically `False' on Windows systems. 

183 """ 

184 return hasattr(signal, "SIGTSTP") 

185 

186 

187def is_windows() -> bool: 

188 """ 

189 True when we are using Windows. 

190 """ 

191 return sys.platform == "win32" # Not 'darwin' or 'linux2' 

192 

193 

194def is_windows_vt100_supported() -> bool: 

195 """ 

196 True when we are using Windows, but VT100 escape sequences are supported. 

197 """ 

198 if sys.platform == "win32": 

199 # Import needs to be inline. Windows libraries are not always available. 

200 from prompt_toolkit.output.windows10 import is_win_vt100_enabled 

201 

202 return is_win_vt100_enabled() 

203 

204 return False 

205 

206 

207def is_conemu_ansi() -> bool: 

208 """ 

209 True when the ConEmu Windows console is used. 

210 """ 

211 return sys.platform == "win32" and os.environ.get("ConEmuANSI", "OFF") == "ON" 

212 

213 

214def in_main_thread() -> bool: 

215 """ 

216 True when the current thread is the main thread. 

217 """ 

218 return threading.current_thread().__class__.__name__ == "_MainThread" 

219 

220 

221def get_bell_environment_variable() -> bool: 

222 """ 

223 True if env variable is set to true (true, TRUE, True, 1). 

224 """ 

225 value = os.environ.get("PROMPT_TOOLKIT_BELL", "true") 

226 return value.lower() in ("1", "true") 

227 

228 

229def get_term_environment_variable() -> str: 

230 "Return the $TERM environment variable." 

231 return os.environ.get("TERM", "") 

232 

233 

234_T = TypeVar("_T") 

235 

236 

237def take_using_weights( 

238 items: list[_T], weights: list[int] 

239) -> Generator[_T, None, None]: 

240 """ 

241 Generator that keeps yielding items from the items list, in proportion to 

242 their weight. For instance:: 

243 

244 # Getting the first 70 items from this generator should have yielded 10 

245 # times A, 20 times B and 40 times C, all distributed equally.. 

246 take_using_weights(['A', 'B', 'C'], [5, 10, 20]) 

247 

248 :param items: List of items to take from. 

249 :param weights: Integers representing the weight. (Numbers have to be 

250 integers, not floats.) 

251 """ 

252 assert len(items) == len(weights) 

253 assert len(items) > 0 

254 

255 # Remove items with zero-weight. 

256 items2 = [] 

257 weights2 = [] 

258 for item, w in zip(items, weights): 

259 if w > 0: 

260 items2.append(item) 

261 weights2.append(w) 

262 

263 items = items2 

264 weights = weights2 

265 

266 # Make sure that we have some items left. 

267 if not items: 

268 raise ValueError("Did't got any items with a positive weight.") 

269 

270 # 

271 already_taken = [0 for i in items] 

272 item_count = len(items) 

273 max_weight = max(weights) 

274 

275 i = 0 

276 while True: 

277 # Each iteration of this loop, we fill up until by (total_weight/max_weight). 

278 adding = True 

279 while adding: 

280 adding = False 

281 

282 for item_i, item, weight in zip(range(item_count), items, weights): 

283 if already_taken[item_i] < i * weight / float(max_weight): 

284 yield item 

285 already_taken[item_i] += 1 

286 adding = True 

287 

288 i += 1 

289 

290 

291def to_str(value: Callable[[], str] | str) -> str: 

292 "Turn callable or string into string." 

293 if callable(value): 

294 return to_str(value()) 

295 else: 

296 return str(value) 

297 

298 

299def to_int(value: Callable[[], int] | int) -> int: 

300 "Turn callable or int into int." 

301 if callable(value): 

302 return to_int(value()) 

303 else: 

304 return int(value) 

305 

306 

307AnyFloat = Union[Callable[[], float], float] 

308 

309 

310def to_float(value: AnyFloat) -> float: 

311 "Turn callable or float into float." 

312 if callable(value): 

313 return to_float(value()) 

314 else: 

315 return float(value) 

316 

317 

318def is_dumb_terminal(term: str | None = None) -> bool: 

319 """ 

320 True if this terminal type is considered "dumb". 

321 

322 If so, we should fall back to the simplest possible form of line editing, 

323 without cursor positioning and color support. 

324 """ 

325 if term is None: 

326 return is_dumb_terminal(os.environ.get("TERM", "")) 

327 

328 return term.lower() in ["dumb", "unknown"]