Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/inputhook.py: 36%

72 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in 

3the asyncio event loop. 

4 

5The way this works is by using a custom 'selector' that runs the other event 

6loop until the real selector is ready. 

7 

8It's the responsibility of this event hook to return when there is input ready. 

9There are two ways to detect when input is ready: 

10 

11The inputhook itself is a callable that receives an `InputHookContext`. This 

12callable should run the other event loop, and return when the main loop has 

13stuff to do. There are two ways to detect when to return: 

14 

15- Call the `input_is_ready` method periodically. Quit when this returns `True`. 

16 

17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor 

18 becomes readable. (But don't read from it.) 

19 

20 Note that this is not the same as checking for `sys.stdin.fileno()`. The 

21 eventloop of prompt-toolkit allows thread-based executors, for example for 

22 asynchronous autocompletion. When the completion for instance is ready, we 

23 also want prompt-toolkit to gain control again in order to display that. 

24""" 

25from __future__ import annotations 

26 

27import asyncio 

28import os 

29import select 

30import selectors 

31import sys 

32import threading 

33from asyncio import AbstractEventLoop, get_running_loop 

34from selectors import BaseSelector, SelectorKey 

35from typing import TYPE_CHECKING, Any, Callable, Mapping 

36 

37__all__ = [ 

38 "new_eventloop_with_inputhook", 

39 "set_eventloop_with_inputhook", 

40 "InputHookSelector", 

41 "InputHookContext", 

42 "InputHook", 

43] 

44 

45if TYPE_CHECKING: 

46 from _typeshed import FileDescriptorLike 

47 from typing_extensions import TypeAlias 

48 

49 _EventMask = int 

50 

51 

52class InputHookContext: 

53 """ 

54 Given as a parameter to the inputhook. 

55 """ 

56 

57 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: 

58 self._fileno = fileno 

59 self.input_is_ready = input_is_ready 

60 

61 def fileno(self) -> int: 

62 return self._fileno 

63 

64 

65InputHook: TypeAlias = Callable[[InputHookContext], None] 

66 

67 

68def new_eventloop_with_inputhook( 

69 inputhook: Callable[[InputHookContext], None], 

70) -> AbstractEventLoop: 

71 """ 

72 Create a new event loop with the given inputhook. 

73 """ 

74 selector = InputHookSelector(selectors.DefaultSelector(), inputhook) 

75 loop = asyncio.SelectorEventLoop(selector) 

76 return loop 

77 

78 

79def set_eventloop_with_inputhook( 

80 inputhook: Callable[[InputHookContext], None], 

81) -> AbstractEventLoop: 

82 """ 

83 Create a new event loop with the given inputhook, and activate it. 

84 """ 

85 # Deprecated! 

86 

87 loop = new_eventloop_with_inputhook(inputhook) 

88 asyncio.set_event_loop(loop) 

89 return loop 

90 

91 

92class InputHookSelector(BaseSelector): 

93 """ 

94 Usage: 

95 

96 selector = selectors.SelectSelector() 

97 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) 

98 asyncio.set_event_loop(loop) 

99 """ 

100 

101 def __init__( 

102 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] 

103 ) -> None: 

104 self.selector = selector 

105 self.inputhook = inputhook 

106 self._r, self._w = os.pipe() 

107 

108 def register( 

109 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

110 ) -> SelectorKey: 

111 return self.selector.register(fileobj, events, data=data) 

112 

113 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: 

114 return self.selector.unregister(fileobj) 

115 

116 def modify( 

117 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

118 ) -> SelectorKey: 

119 return self.selector.modify(fileobj, events, data=None) 

120 

121 def select( 

122 self, timeout: float | None = None 

123 ) -> list[tuple[SelectorKey, _EventMask]]: 

124 # If there are tasks in the current event loop, 

125 # don't run the input hook. 

126 if len(getattr(get_running_loop(), "_ready", [])) > 0: 

127 return self.selector.select(timeout=timeout) 

128 

129 ready = False 

130 result = None 

131 

132 # Run selector in other thread. 

133 def run_selector() -> None: 

134 nonlocal ready, result 

135 result = self.selector.select(timeout=timeout) 

136 os.write(self._w, b"x") 

137 ready = True 

138 

139 th = threading.Thread(target=run_selector) 

140 th.start() 

141 

142 def input_is_ready() -> bool: 

143 return ready 

144 

145 # Call inputhook. 

146 # The inputhook function is supposed to return when our selector 

147 # becomes ready. The inputhook can do that by registering the fd in its 

148 # own loop, or by checking the `input_is_ready` function regularly. 

149 self.inputhook(InputHookContext(self._r, input_is_ready)) 

150 

151 # Flush the read end of the pipe. 

152 try: 

153 # Before calling 'os.read', call select.select. This is required 

154 # when the gevent monkey patch has been applied. 'os.read' is never 

155 # monkey patched and won't be cooperative, so that would block all 

156 # other select() calls otherwise. 

157 # See: http://www.gevent.org/gevent.os.html 

158 

159 # Note: On Windows, this is apparently not an issue. 

160 # However, if we would ever want to add a select call, it 

161 # should use `windll.kernel32.WaitForMultipleObjects`, 

162 # because `select.select` can't wait for a pipe on Windows. 

163 if sys.platform != "win32": 

164 select.select([self._r], [], [], None) 

165 

166 os.read(self._r, 1024) 

167 except OSError: 

168 # This happens when the window resizes and a SIGWINCH was received. 

169 # We get 'Error: [Errno 4] Interrupted system call' 

170 # Just ignore. 

171 pass 

172 

173 # Wait for the real selector to be done. 

174 th.join() 

175 assert result is not None 

176 return result 

177 

178 def close(self) -> None: 

179 """ 

180 Clean up resources. 

181 """ 

182 if self._r: 

183 os.close(self._r) 

184 os.close(self._w) 

185 

186 self._r = self._w = -1 

187 self.selector.close() 

188 

189 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: 

190 return self.selector.get_map()