Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/eventloop/inputhook.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

73 statements  

1""" 

2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in 

3the asyncio event loop. 

4 

5The way this works is by using a custom 'selector' that runs the other event 

6loop until the real selector is ready. 

7 

8It's the responsibility of this event hook to return when there is input ready. 

9There are two ways to detect when input is ready: 

10 

11The inputhook itself is a callable that receives an `InputHookContext`. This 

12callable should run the other event loop, and return when the main loop has 

13stuff to do. There are two ways to detect when to return: 

14 

15- Call the `input_is_ready` method periodically. Quit when this returns `True`. 

16 

17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor 

18 becomes readable. (But don't read from it.) 

19 

20 Note that this is not the same as checking for `sys.stdin.fileno()`. The 

21 eventloop of prompt-toolkit allows thread-based executors, for example for 

22 asynchronous autocompletion. When the completion for instance is ready, we 

23 also want prompt-toolkit to gain control again in order to display that. 

24""" 

25 

26from __future__ import annotations 

27 

28import asyncio 

29import os 

30import select 

31import selectors 

32import sys 

33import threading 

34from asyncio import AbstractEventLoop, get_running_loop 

35from selectors import BaseSelector, SelectorKey 

36from typing import TYPE_CHECKING, Any, Callable, Mapping 

37 

38__all__ = [ 

39 "new_eventloop_with_inputhook", 

40 "set_eventloop_with_inputhook", 

41 "InputHookSelector", 

42 "InputHookContext", 

43 "InputHook", 

44] 

45 

46if TYPE_CHECKING: 

47 from _typeshed import FileDescriptorLike 

48 from typing_extensions import TypeAlias 

49 

50 _EventMask = int 

51 

52 

53class InputHookContext: 

54 """ 

55 Given as a parameter to the inputhook. 

56 """ 

57 

58 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: 

59 self._fileno = fileno 

60 self.input_is_ready = input_is_ready 

61 

62 def fileno(self) -> int: 

63 return self._fileno 

64 

65 

66InputHook: TypeAlias = Callable[[InputHookContext], None] 

67 

68 

69def new_eventloop_with_inputhook( 

70 inputhook: Callable[[InputHookContext], None], 

71) -> AbstractEventLoop: 

72 """ 

73 Create a new event loop with the given inputhook. 

74 """ 

75 selector = InputHookSelector(selectors.DefaultSelector(), inputhook) 

76 loop = asyncio.SelectorEventLoop(selector) 

77 return loop 

78 

79 

80def set_eventloop_with_inputhook( 

81 inputhook: Callable[[InputHookContext], None], 

82) -> AbstractEventLoop: 

83 """ 

84 Create a new event loop with the given inputhook, and activate it. 

85 """ 

86 # Deprecated! 

87 

88 loop = new_eventloop_with_inputhook(inputhook) 

89 asyncio.set_event_loop(loop) 

90 return loop 

91 

92 

93class InputHookSelector(BaseSelector): 

94 """ 

95 Usage: 

96 

97 selector = selectors.SelectSelector() 

98 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) 

99 asyncio.set_event_loop(loop) 

100 """ 

101 

102 def __init__( 

103 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] 

104 ) -> None: 

105 self.selector = selector 

106 self.inputhook = inputhook 

107 self._r, self._w = os.pipe() 

108 

109 def register( 

110 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

111 ) -> SelectorKey: 

112 return self.selector.register(fileobj, events, data=data) 

113 

114 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: 

115 return self.selector.unregister(fileobj) 

116 

117 def modify( 

118 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

119 ) -> SelectorKey: 

120 return self.selector.modify(fileobj, events, data=None) 

121 

122 def select( 

123 self, timeout: float | None = None 

124 ) -> list[tuple[SelectorKey, _EventMask]]: 

125 # If there are tasks in the current event loop, 

126 # don't run the input hook. 

127 if len(getattr(get_running_loop(), "_ready", [])) > 0: 

128 return self.selector.select(timeout=timeout) 

129 

130 ready = False 

131 result = None 

132 

133 # Run selector in other thread. 

134 def run_selector() -> None: 

135 nonlocal ready, result 

136 result = self.selector.select(timeout=timeout) 

137 os.write(self._w, b"x") 

138 ready = True 

139 

140 th = threading.Thread(target=run_selector) 

141 th.start() 

142 

143 def input_is_ready() -> bool: 

144 return ready 

145 

146 # Call inputhook. 

147 # The inputhook function is supposed to return when our selector 

148 # becomes ready. The inputhook can do that by registering the fd in its 

149 # own loop, or by checking the `input_is_ready` function regularly. 

150 self.inputhook(InputHookContext(self._r, input_is_ready)) 

151 

152 # Flush the read end of the pipe. 

153 try: 

154 # Before calling 'os.read', call select.select. This is required 

155 # when the gevent monkey patch has been applied. 'os.read' is never 

156 # monkey patched and won't be cooperative, so that would block all 

157 # other select() calls otherwise. 

158 # See: http://www.gevent.org/gevent.os.html 

159 

160 # Note: On Windows, this is apparently not an issue. 

161 # However, if we would ever want to add a select call, it 

162 # should use `windll.kernel32.WaitForMultipleObjects`, 

163 # because `select.select` can't wait for a pipe on Windows. 

164 if sys.platform != "win32": 

165 select.select([self._r], [], [], None) 

166 

167 os.read(self._r, 1024) 

168 except OSError: 

169 # This happens when the window resizes and a SIGWINCH was received. 

170 # We get 'Error: [Errno 4] Interrupted system call' 

171 # Just ignore. 

172 pass 

173 

174 # Wait for the real selector to be done. 

175 th.join() 

176 assert result is not None 

177 return result 

178 

179 def close(self) -> None: 

180 """ 

181 Clean up resources. 

182 """ 

183 if self._r: 

184 os.close(self._r) 

185 os.close(self._w) 

186 

187 self._r = self._w = -1 

188 self.selector.close() 

189 

190 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: 

191 return self.selector.get_map()