Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/utils.py: 37%

122 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1from __future__ import annotations 

2 

3import os 

4import signal 

5import sys 

6import threading 

7from collections import deque 

8from typing import ( 

9 Callable, 

10 ContextManager, 

11 Deque, 

12 Dict, 

13 Generator, 

14 Generic, 

15 List, 

16 Optional, 

17 TypeVar, 

18 Union, 

19) 

20 

21from wcwidth import wcwidth 

22 

23__all__ = [ 

24 "Event", 

25 "DummyContext", 

26 "get_cwidth", 

27 "suspend_to_background_supported", 

28 "is_conemu_ansi", 

29 "is_windows", 

30 "in_main_thread", 

31 "get_bell_environment_variable", 

32 "get_term_environment_variable", 

33 "take_using_weights", 

34 "to_str", 

35 "to_int", 

36 "AnyFloat", 

37 "to_float", 

38 "is_dumb_terminal", 

39] 

40 

41# Used to ensure sphinx autodoc does not try to import platform-specific 

42# stuff when documenting win32.py modules. 

43SPHINX_AUTODOC_RUNNING = "sphinx.ext.autodoc" in sys.modules 

44 

45_Sender = TypeVar("_Sender", covariant=True) 

46 

47 

48class Event(Generic[_Sender]): 

49 """ 

50 Simple event to which event handlers can be attached. For instance:: 

51 

52 class Cls: 

53 def __init__(self): 

54 # Define event. The first parameter is the sender. 

55 self.event = Event(self) 

56 

57 obj = Cls() 

58 

59 def handler(sender): 

60 pass 

61 

62 # Add event handler by using the += operator. 

63 obj.event += handler 

64 

65 # Fire event. 

66 obj.event() 

67 """ 

68 

69 def __init__( 

70 self, sender: _Sender, handler: Callable[[_Sender], None] | None = None 

71 ) -> None: 

72 self.sender = sender 

73 self._handlers: list[Callable[[_Sender], None]] = [] 

74 

75 if handler is not None: 

76 self += handler 

77 

78 def __call__(self) -> None: 

79 "Fire event." 

80 for handler in self._handlers: 

81 handler(self.sender) 

82 

83 def fire(self) -> None: 

84 "Alias for just calling the event." 

85 self() 

86 

87 def add_handler(self, handler: Callable[[_Sender], None]) -> None: 

88 """ 

89 Add another handler to this callback. 

90 (Handler should be a callable that takes exactly one parameter: the 

91 sender object.) 

92 """ 

93 # Add to list of event handlers. 

94 self._handlers.append(handler) 

95 

96 def remove_handler(self, handler: Callable[[_Sender], None]) -> None: 

97 """ 

98 Remove a handler from this callback. 

99 """ 

100 if handler in self._handlers: 

101 self._handlers.remove(handler) 

102 

103 def __iadd__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

104 """ 

105 `event += handler` notation for adding a handler. 

106 """ 

107 self.add_handler(handler) 

108 return self 

109 

110 def __isub__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: 

111 """ 

112 `event -= handler` notation for removing a handler. 

113 """ 

114 self.remove_handler(handler) 

115 return self 

116 

117 

118class DummyContext(ContextManager[None]): 

119 """ 

120 (contextlib.nested is not available on Py3) 

121 """ 

122 

123 def __enter__(self) -> None: 

124 pass 

125 

126 def __exit__(self, *a: object) -> None: 

127 pass 

128 

129 

130class _CharSizesCache(Dict[str, int]): 

131 """ 

132 Cache for wcwidth sizes. 

133 """ 

134 

135 LONG_STRING_MIN_LEN = 64 # Minimum string length for considering it long. 

136 MAX_LONG_STRINGS = 16 # Maximum number of long strings to remember. 

137 

138 def __init__(self) -> None: 

139 super().__init__() 

140 # Keep track of the "long" strings in this cache. 

141 self._long_strings: Deque[str] = deque() 

142 

143 def __missing__(self, string: str) -> int: 

144 # Note: We use the `max(0, ...` because some non printable control 

145 # characters, like e.g. Ctrl-underscore get a -1 wcwidth value. 

146 # It can be possible that these characters end up in the input 

147 # text. 

148 result: int 

149 if len(string) == 1: 

150 result = max(0, wcwidth(string)) 

151 else: 

152 result = sum(self[c] for c in string) 

153 

154 # Store in cache. 

155 self[string] = result 

156 

157 # Rotate long strings. 

158 # (It's hard to tell what we can consider short...) 

159 if len(string) > self.LONG_STRING_MIN_LEN: 

160 long_strings = self._long_strings 

161 long_strings.append(string) 

162 

163 if len(long_strings) > self.MAX_LONG_STRINGS: 

164 key_to_remove = long_strings.popleft() 

165 if key_to_remove in self: 

166 del self[key_to_remove] 

167 

168 return result 

169 

170 

171_CHAR_SIZES_CACHE = _CharSizesCache() 

172 

173 

174def get_cwidth(string: str) -> int: 

175 """ 

176 Return width of a string. Wrapper around ``wcwidth``. 

177 """ 

178 return _CHAR_SIZES_CACHE[string] 

179 

180 

181def suspend_to_background_supported() -> bool: 

182 """ 

183 Returns `True` when the Python implementation supports 

184 suspend-to-background. This is typically `False' on Windows systems. 

185 """ 

186 return hasattr(signal, "SIGTSTP") 

187 

188 

189def is_windows() -> bool: 

190 """ 

191 True when we are using Windows. 

192 """ 

193 return sys.platform == "win32" # Not 'darwin' or 'linux2' 

194 

195 

196def is_windows_vt100_supported() -> bool: 

197 """ 

198 True when we are using Windows, but VT100 escape sequences are supported. 

199 """ 

200 if sys.platform == "win32": 

201 # Import needs to be inline. Windows libraries are not always available. 

202 from prompt_toolkit.output.windows10 import is_win_vt100_enabled 

203 

204 return is_win_vt100_enabled() 

205 

206 return False 

207 

208 

209def is_conemu_ansi() -> bool: 

210 """ 

211 True when the ConEmu Windows console is used. 

212 """ 

213 return sys.platform == "win32" and os.environ.get("ConEmuANSI", "OFF") == "ON" 

214 

215 

216def in_main_thread() -> bool: 

217 """ 

218 True when the current thread is the main thread. 

219 """ 

220 return threading.current_thread().__class__.__name__ == "_MainThread" 

221 

222 

223def get_bell_environment_variable() -> bool: 

224 """ 

225 True if env variable is set to true (true, TRUE, TrUe, 1). 

226 """ 

227 value = os.environ.get("PROMPT_TOOLKIT_BELL", "true") 

228 return value.lower() in ("1", "true") 

229 

230 

231def get_term_environment_variable() -> str: 

232 "Return the $TERM environment variable." 

233 return os.environ.get("TERM", "") 

234 

235 

236_T = TypeVar("_T") 

237 

238 

239def take_using_weights( 

240 items: list[_T], weights: list[int] 

241) -> Generator[_T, None, None]: 

242 """ 

243 Generator that keeps yielding items from the items list, in proportion to 

244 their weight. For instance:: 

245 

246 # Getting the first 70 items from this generator should have yielded 10 

247 # times A, 20 times B and 40 times C, all distributed equally.. 

248 take_using_weights(['A', 'B', 'C'], [5, 10, 20]) 

249 

250 :param items: List of items to take from. 

251 :param weights: Integers representing the weight. (Numbers have to be 

252 integers, not floats.) 

253 """ 

254 assert len(items) == len(weights) 

255 assert len(items) > 0 

256 

257 # Remove items with zero-weight. 

258 items2 = [] 

259 weights2 = [] 

260 for item, w in zip(items, weights): 

261 if w > 0: 

262 items2.append(item) 

263 weights2.append(w) 

264 

265 items = items2 

266 weights = weights2 

267 

268 # Make sure that we have some items left. 

269 if not items: 

270 raise ValueError("Did't got any items with a positive weight.") 

271 

272 # 

273 already_taken = [0 for i in items] 

274 item_count = len(items) 

275 max_weight = max(weights) 

276 

277 i = 0 

278 while True: 

279 # Each iteration of this loop, we fill up until by (total_weight/max_weight). 

280 adding = True 

281 while adding: 

282 adding = False 

283 

284 for item_i, item, weight in zip(range(item_count), items, weights): 

285 if already_taken[item_i] < i * weight / float(max_weight): 

286 yield item 

287 already_taken[item_i] += 1 

288 adding = True 

289 

290 i += 1 

291 

292 

293def to_str(value: Callable[[], str] | str) -> str: 

294 "Turn callable or string into string." 

295 if callable(value): 

296 return to_str(value()) 

297 else: 

298 return str(value) 

299 

300 

301def to_int(value: Callable[[], int] | int) -> int: 

302 "Turn callable or int into int." 

303 if callable(value): 

304 return to_int(value()) 

305 else: 

306 return int(value) 

307 

308 

309AnyFloat = Union[Callable[[], float], float] 

310 

311 

312def to_float(value: AnyFloat) -> float: 

313 "Turn callable or float into float." 

314 if callable(value): 

315 return to_float(value()) 

316 else: 

317 return float(value) 

318 

319 

320def is_dumb_terminal(term: str | None = None) -> bool: 

321 """ 

322 True if this terminal type is considered "dumb". 

323 

324 If so, we should fall back to the simplest possible form of line editing, 

325 without cursor positioning and color support. 

326 """ 

327 if term is None: 

328 return is_dumb_terminal(os.environ.get("TERM", "")) 

329 

330 return term.lower() in ["dumb", "unknown"]