Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/inputhook.py: 36%

70 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1""" 

2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in 

3the asyncio event loop. 

4 

5The way this works is by using a custom 'selector' that runs the other event 

6loop until the real selector is ready. 

7 

8It's the responsibility of this event hook to return when there is input ready. 

9There are two ways to detect when input is ready: 

10 

11The inputhook itself is a callable that receives an `InputHookContext`. This 

12callable should run the other event loop, and return when the main loop has 

13stuff to do. There are two ways to detect when to return: 

14 

15- Call the `input_is_ready` method periodically. Quit when this returns `True`. 

16 

17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor 

18 becomes readable. (But don't read from it.) 

19 

20 Note that this is not the same as checking for `sys.stdin.fileno()`. The 

21 eventloop of prompt-toolkit allows thread-based executors, for example for 

22 asynchronous autocompletion. When the completion for instance is ready, we 

23 also want prompt-toolkit to gain control again in order to display that. 

24""" 

25from __future__ import annotations 

26 

27import asyncio 

28import os 

29import select 

30import selectors 

31import sys 

32import threading 

33from asyncio import AbstractEventLoop, get_running_loop 

34from selectors import BaseSelector, SelectorKey 

35from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Tuple 

36 

37__all__ = [ 

38 "new_eventloop_with_inputhook", 

39 "set_eventloop_with_inputhook", 

40 "InputHookSelector", 

41 "InputHookContext", 

42] 

43 

44if TYPE_CHECKING: 

45 from _typeshed import FileDescriptorLike 

46 

47 _EventMask = int 

48 

49 

50def new_eventloop_with_inputhook( 

51 inputhook: Callable[[InputHookContext], None] 

52) -> AbstractEventLoop: 

53 """ 

54 Create a new event loop with the given inputhook. 

55 """ 

56 selector = InputHookSelector(selectors.DefaultSelector(), inputhook) 

57 loop = asyncio.SelectorEventLoop(selector) 

58 return loop 

59 

60 

61def set_eventloop_with_inputhook( 

62 inputhook: Callable[[InputHookContext], None] 

63) -> AbstractEventLoop: 

64 """ 

65 Create a new event loop with the given inputhook, and activate it. 

66 """ 

67 loop = new_eventloop_with_inputhook(inputhook) 

68 asyncio.set_event_loop(loop) 

69 return loop 

70 

71 

72class InputHookSelector(BaseSelector): 

73 """ 

74 Usage: 

75 

76 selector = selectors.SelectSelector() 

77 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) 

78 asyncio.set_event_loop(loop) 

79 """ 

80 

81 def __init__( 

82 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] 

83 ) -> None: 

84 self.selector = selector 

85 self.inputhook = inputhook 

86 self._r, self._w = os.pipe() 

87 

88 def register( 

89 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

90 ) -> SelectorKey: 

91 return self.selector.register(fileobj, events, data=data) 

92 

93 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: 

94 return self.selector.unregister(fileobj) 

95 

96 def modify( 

97 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 

98 ) -> SelectorKey: 

99 return self.selector.modify(fileobj, events, data=None) 

100 

101 def select( 

102 self, timeout: float | None = None 

103 ) -> list[tuple[SelectorKey, _EventMask]]: 

104 # If there are tasks in the current event loop, 

105 # don't run the input hook. 

106 if len(getattr(get_running_loop(), "_ready", [])) > 0: 

107 return self.selector.select(timeout=timeout) 

108 

109 ready = False 

110 result = None 

111 

112 # Run selector in other thread. 

113 def run_selector() -> None: 

114 nonlocal ready, result 

115 result = self.selector.select(timeout=timeout) 

116 os.write(self._w, b"x") 

117 ready = True 

118 

119 th = threading.Thread(target=run_selector) 

120 th.start() 

121 

122 def input_is_ready() -> bool: 

123 return ready 

124 

125 # Call inputhook. 

126 # The inputhook function is supposed to return when our selector 

127 # becomes ready. The inputhook can do that by registering the fd in its 

128 # own loop, or by checking the `input_is_ready` function regularly. 

129 self.inputhook(InputHookContext(self._r, input_is_ready)) 

130 

131 # Flush the read end of the pipe. 

132 try: 

133 # Before calling 'os.read', call select.select. This is required 

134 # when the gevent monkey patch has been applied. 'os.read' is never 

135 # monkey patched and won't be cooperative, so that would block all 

136 # other select() calls otherwise. 

137 # See: http://www.gevent.org/gevent.os.html 

138 

139 # Note: On Windows, this is apparently not an issue. 

140 # However, if we would ever want to add a select call, it 

141 # should use `windll.kernel32.WaitForMultipleObjects`, 

142 # because `select.select` can't wait for a pipe on Windows. 

143 if sys.platform != "win32": 

144 select.select([self._r], [], [], None) 

145 

146 os.read(self._r, 1024) 

147 except OSError: 

148 # This happens when the window resizes and a SIGWINCH was received. 

149 # We get 'Error: [Errno 4] Interrupted system call' 

150 # Just ignore. 

151 pass 

152 

153 # Wait for the real selector to be done. 

154 th.join() 

155 assert result is not None 

156 return result 

157 

158 def close(self) -> None: 

159 """ 

160 Clean up resources. 

161 """ 

162 if self._r: 

163 os.close(self._r) 

164 os.close(self._w) 

165 

166 self._r = self._w = -1 

167 self.selector.close() 

168 

169 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: 

170 return self.selector.get_map() 

171 

172 

173class InputHookContext: 

174 """ 

175 Given as a parameter to the inputhook. 

176 """ 

177 

178 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: 

179 self._fileno = fileno 

180 self.input_is_ready = input_is_ready 

181 

182 def fileno(self) -> int: 

183 return self._fileno