Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/eventloop/inputhook.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

74 statements  

1""" 

2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in 

3the asyncio event loop. 

4 

5The way this works is by using a custom 'selector' that runs the other event 

6loop until the real selector is ready. 

7 

8It's the responsibility of this event hook to return when there is input ready. 

9There are two ways to detect when input is ready: 

10 

11The inputhook itself is a callable that receives an `InputHookContext`. This 

12callable should run the other event loop, and return when the main loop has 

13stuff to do. There are two ways to detect when to return: 

14 

15- Call the `input_is_ready` method periodically. Quit when this returns `True`. 

16 

17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor 

18 becomes readable. (But don't read from it.) 

19 

20 Note that this is not the same as checking for `sys.stdin.fileno()`. The 

21 eventloop of prompt-toolkit allows thread-based executors, for example for 

22 asynchronous autocompletion. When the completion for instance is ready, we 

23 also want prompt-toolkit to gain control again in order to display that. 

24""" 

25 

26from __future__ import annotations 

27 

28import asyncio 

29import os 

30import select 

31import selectors 

32import sys 

33import threading 

34from asyncio import AbstractEventLoop, get_running_loop 

35from collections.abc import Callable, Mapping 

36from selectors import BaseSelector, SelectorKey 

37from typing import TYPE_CHECKING, Any 

38 

39__all__ = [ 

40 "new_eventloop_with_inputhook", 

41 "set_eventloop_with_inputhook", 

42 "InputHookSelector", 

43 "InputHookContext", 

44 "InputHook", 

45] 

46 

47if TYPE_CHECKING: 

48 from typing import TypeAlias 

49 

50 from _typeshed import FileDescriptorLike 

51 

52 _EventMask = int 

53 

54 

55class InputHookContext: 

56 """ 

57 Given as a parameter to the inputhook. 

58 """ 

59 

60 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: 

61 self._fileno = fileno 

62 self.input_is_ready = input_is_ready 

63 

64 def fileno(self) -> int: 

65 return self._fileno 

66 

67 

68InputHook: TypeAlias = Callable[[InputHookContext], None] 

69 

70 

71def new_eventloop_with_inputhook( 

72 inputhook: Callable[[InputHookContext], None], 

73) -> AbstractEventLoop: 

74 """ 

75 Create a new event loop with the given inputhook. 

76 """ 

77 selector = InputHookSelector(selectors.DefaultSelector(), inputhook) 

78 loop = asyncio.SelectorEventLoop(selector) 

79 return loop 

80 

81 

82def set_eventloop_with_inputhook( 

83 inputhook: Callable[[InputHookContext], None], 

84) -> AbstractEventLoop: 

85 """ 

86 Create a new event loop with the given inputhook, and activate it. 

87 """ 

88 # Deprecated! 

89 

90 loop = new_eventloop_with_inputhook(inputhook) 

91 asyncio.set_event_loop(loop) 

92 return loop 

93 

94 

95class InputHookSelector(BaseSelector): 

96 """ 

97 Usage: 

98 

99 selector = selectors.SelectSelector() 

100 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) 

101 asyncio.set_event_loop(loop) 

102 """ 

103 

104 def __init__( 

105 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] 

106 ) -> None: 

107 self.selector = selector 

108 self.inputhook = inputhook 

109 self._r, self._w = os.pipe() 

110 

111 def register( 

112 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

113 ) -> SelectorKey: 

114 return self.selector.register(fileobj, events, data=data) 

115 

116 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: 

117 return self.selector.unregister(fileobj) 

118 

119 def modify( 

120 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

121 ) -> SelectorKey: 

122 return self.selector.modify(fileobj, events, data=None) 

123 

124 def select( 

125 self, timeout: float | None = None 

126 ) -> list[tuple[SelectorKey, _EventMask]]: 

127 # If there are tasks in the current event loop, 

128 # don't run the input hook. 

129 if len(getattr(get_running_loop(), "_ready", [])) > 0: 

130 return self.selector.select(timeout=timeout) 

131 

132 ready = False 

133 result = None 

134 

135 # Run selector in other thread. 

136 def run_selector() -> None: 

137 nonlocal ready, result 

138 result = self.selector.select(timeout=timeout) 

139 os.write(self._w, b"x") 

140 ready = True 

141 

142 th = threading.Thread(target=run_selector) 

143 th.start() 

144 

145 def input_is_ready() -> bool: 

146 return ready 

147 

148 # Call inputhook. 

149 # The inputhook function is supposed to return when our selector 

150 # becomes ready. The inputhook can do that by registering the fd in its 

151 # own loop, or by checking the `input_is_ready` function regularly. 

152 self.inputhook(InputHookContext(self._r, input_is_ready)) 

153 

154 # Flush the read end of the pipe. 

155 try: 

156 # Before calling 'os.read', call select.select. This is required 

157 # when the gevent monkey patch has been applied. 'os.read' is never 

158 # monkey patched and won't be cooperative, so that would block all 

159 # other select() calls otherwise. 

160 # See: http://www.gevent.org/gevent.os.html 

161 

162 # Note: On Windows, this is apparently not an issue. 

163 # However, if we would ever want to add a select call, it 

164 # should use `windll.kernel32.WaitForMultipleObjects`, 

165 # because `select.select` can't wait for a pipe on Windows. 

166 if sys.platform != "win32": 

167 select.select([self._r], [], [], None) 

168 

169 os.read(self._r, 1024) 

170 except OSError: 

171 # This happens when the window resizes and a SIGWINCH was received. 

172 # We get 'Error: [Errno 4] Interrupted system call' 

173 # Just ignore. 

174 pass 

175 

176 # Wait for the real selector to be done. 

177 th.join() 

178 assert result is not None 

179 return result 

180 

181 def close(self) -> None: 

182 """ 

183 Clean up resources. 

184 """ 

185 if self._r: 

186 os.close(self._r) 

187 os.close(self._w) 

188 

189 self._r = self._w = -1 

190 self.selector.close() 

191 

192 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: 

193 return self.selector.get_map()