Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/utils.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

125 statements  

1from __future__ import annotations 

2 

3import os 

4import signal 

5import sys 

6import threading 

7from collections import deque 

8from collections.abc import Callable, Generator 

9from contextlib import AbstractContextManager 

10from typing import ( 

11 Generic, 

12 TypeVar, 

13) 

14 

15from wcwidth import wcwidth 

16 

17__all__ = [ 

18 "Event", 

19 "DummyContext", 

20 "get_cwidth", 

21 "suspend_to_background_supported", 

22 "is_conemu_ansi", 

23 "is_windows", 

24 "in_main_thread", 

25 "get_bell_environment_variable", 

26 "get_term_environment_variable", 

27 "take_using_weights", 

28 "to_str", 

29 "to_int", 

30 "AnyFloat", 

31 "to_float", 

32 "is_dumb_terminal", 

33] 

34 

35# Used to ensure sphinx autodoc does not try to import platform-specific 

36# stuff when documenting win32.py modules. 

37SPHINX_AUTODOC_RUNNING = "sphinx.ext.autodoc" in sys.modules 

38 

39_Sender = TypeVar("_Sender", covariant=True) 

40 

41 

42class Event(Generic[_Sender]): 

43 """ 

44 Simple event to which event handlers can be attached. For instance:: 

45 

46 class Cls: 

47 def __init__(self): 

48 # Define event. The first parameter is the sender. 

49 self.event = Event(self) 

50 

51 obj = Cls() 

52 

53 def handler(sender): 

54 pass 

55 

56 # Add event handler by using the += operator. 

57 obj.event += handler 

58 

59 # Fire event. 

60 obj.event() 

61 """ 

62 

63 def __init__( 

64 self, sender: _Sender, handler: Callable[[_Sender], None] | None = None 

65 ) -> None: 

66 self.sender = sender 

67 self._handlers: list[Callable[[_Sender], None]] = [] 

68 

69 if handler is not None: 

70 self += handler 

71 

72 def __call__(self) -> None: 

73 "Fire event." 

74 for handler in self._handlers: 

75 handler(self.sender) 

76 

77 def fire(self) -> None: 

78 "Alias for just calling the event." 

79 self() 

80 

81 def add_handler(self, handler: Callable[[_Sender], None]) -> None: 

82 """ 

83 Add another handler to this callback. 

84 (Handler should be a callable that takes exactly one parameter: the 

85 sender object.) 

86 """ 

87 # Add to list of event handlers. 

88 self._handlers.append(handler) 

89 

90 def remove_handler(self, handler: Callable[[_Sender], None]) -> None: 

91 """ 

92 Remove a handler from this callback. 

93 """ 

94 if handler in self._handlers: 

95 self._handlers.remove(handler) 

96 

97 def __iadd__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

98 """ 

99 `event += handler` notation for adding a handler. 

100 """ 

101 self.add_handler(handler) 

102 return self 

103 

104 def __isub__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

105 """ 

106 `event -= handler` notation for removing a handler. 

107 """ 

108 self.remove_handler(handler) 

109 return self 

110 

111 

112class DummyContext(AbstractContextManager[None]): 

113 """ 

114 (contextlib.nested is not available on Py3) 

115 """ 

116 

117 def __enter__(self) -> None: 

118 pass 

119 

120 def __exit__(self, *a: object) -> None: 

121 pass 

122 

123 

124class _CharSizesCache(dict[str, int]): 

125 """ 

126 Cache for wcwidth sizes. 

127 """ 

128 

129 LONG_STRING_MIN_LEN = 64 # Minimum string length for considering it long. 

130 MAX_LONG_STRINGS = 16 # Maximum number of long strings to remember. 

131 

132 def __init__(self) -> None: 

133 super().__init__() 

134 # Keep track of the "long" strings in this cache. 

135 self._long_strings: deque[str] = deque() 

136 

137 def __missing__(self, string: str) -> int: 

138 # Note: We use the `max(0, ...` because some non printable control 

139 # characters, like e.g. Ctrl-underscore get a -1 wcwidth value. 

140 # It can be possible that these characters end up in the input 

141 # text. 

142 result: int 

143 if len(string) == 1: 

144 result = max(0, wcwidth(string)) 

145 else: 

146 result = sum(self[c] for c in string) 

147 

148 # Store in cache. 

149 self[string] = result 

150 

151 # Rotate long strings. 

152 # (It's hard to tell what we can consider short...) 

153 if len(string) > self.LONG_STRING_MIN_LEN: 

154 long_strings = self._long_strings 

155 long_strings.append(string) 

156 

157 if len(long_strings) > self.MAX_LONG_STRINGS: 

158 key_to_remove = long_strings.popleft() 

159 if key_to_remove in self: 

160 del self[key_to_remove] 

161 

162 return result 

163 

164 

165_CHAR_SIZES_CACHE = _CharSizesCache() 

166 

167 

168def get_cwidth(string: str) -> int: 

169 """ 

170 Return width of a string. Wrapper around ``wcwidth``. 

171 """ 

172 return _CHAR_SIZES_CACHE[string] 

173 

174 

175def suspend_to_background_supported() -> bool: 

176 """ 

177 Returns `True` when the Python implementation supports 

178 suspend-to-background. This is typically `False' on Windows systems. 

179 """ 

180 return hasattr(signal, "SIGTSTP") 

181 

182 

183def is_windows() -> bool: 

184 """ 

185 True when we are using Windows. 

186 """ 

187 return sys.platform == "win32" # Not 'darwin' or 'linux2' 

188 

189 

190def is_windows_vt100_supported() -> bool: 

191 """ 

192 True when we are using Windows, but VT100 escape sequences are supported. 

193 """ 

194 if sys.platform == "win32": 

195 # Import needs to be inline. Windows libraries are not always available. 

196 from prompt_toolkit.output.windows10 import is_win_vt100_enabled 

197 

198 return is_win_vt100_enabled() 

199 

200 return False 

201 

202 

203def is_conemu_ansi() -> bool: 

204 """ 

205 True when the ConEmu Windows console is used. 

206 """ 

207 return sys.platform == "win32" and os.environ.get("ConEmuANSI", "OFF") == "ON" 

208 

209 

210def in_main_thread() -> bool: 

211 """ 

212 True when the current thread is the main thread. 

213 """ 

214 return threading.current_thread().__class__.__name__ == "_MainThread" 

215 

216 

217def get_bell_environment_variable() -> bool: 

218 """ 

219 True if env variable is set to true (true, TRUE, True, 1). 

220 """ 

221 value = os.environ.get("PROMPT_TOOLKIT_BELL", "true") 

222 return value.lower() in ("1", "true") 

223 

224 

225def get_term_environment_variable() -> str: 

226 "Return the $TERM environment variable." 

227 return os.environ.get("TERM", "") 

228 

229 

230_T = TypeVar("_T") 

231 

232 

233def take_using_weights( 

234 items: list[_T], weights: list[int] 

235) -> Generator[_T, None, None]: 

236 """ 

237 Generator that keeps yielding items from the items list, in proportion to 

238 their weight. For instance:: 

239 

240 # Getting the first 70 items from this generator should have yielded 10 

241 # times A, 20 times B and 40 times C, all distributed equally.. 

242 take_using_weights(['A', 'B', 'C'], [5, 10, 20]) 

243 

244 :param items: List of items to take from. 

245 :param weights: Integers representing the weight. (Numbers have to be 

246 integers, not floats.) 

247 """ 

248 assert len(items) == len(weights) 

249 assert len(items) > 0 

250 

251 # Remove items with zero-weight. 

252 items2 = [] 

253 weights2 = [] 

254 for item, w in zip(items, weights): 

255 if w > 0: 

256 items2.append(item) 

257 weights2.append(w) 

258 

259 items = items2 

260 weights = weights2 

261 

262 # Make sure that we have some items left. 

263 if not items: 

264 raise ValueError("Did't got any items with a positive weight.") 

265 

266 # 

267 already_taken = [0 for i in items] 

268 item_count = len(items) 

269 max_weight = max(weights) 

270 

271 i = 0 

272 while True: 

273 # Each iteration of this loop, we fill up until by (total_weight/max_weight). 

274 adding = True 

275 while adding: 

276 adding = False 

277 

278 for item_i, item, weight in zip(range(item_count), items, weights): 

279 if already_taken[item_i] < i * weight / float(max_weight): 

280 yield item 

281 already_taken[item_i] += 1 

282 adding = True 

283 

284 i += 1 

285 

286 

287def to_str(value: Callable[[], str] | str) -> str: 

288 "Turn callable or string into string." 

289 if callable(value): 

290 return to_str(value()) 

291 else: 

292 return str(value) 

293 

294 

295def to_int(value: Callable[[], int] | int) -> int: 

296 "Turn callable or int into int." 

297 if callable(value): 

298 return to_int(value()) 

299 else: 

300 return int(value) 

301 

302 

303AnyFloat = Callable[[], float] | float 

304 

305 

306def to_float(value: AnyFloat) -> float: 

307 "Turn callable or float into float." 

308 if callable(value): 

309 return to_float(value()) 

310 else: 

311 return float(value) 

312 

313 

314def is_dumb_terminal(term: str | None = None) -> bool: 

315 """ 

316 True if this terminal type is considered "dumb". 

317 

318 If so, we should fall back to the simplest possible form of line editing, 

319 without cursor positioning and color support. 

320 """ 

321 if term is None: 

322 return is_dumb_terminal(os.environ.get("TERM", "")) 

323 

324 return term.lower() in ["dumb", "unknown"]