1"""
2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
3the asyncio event loop.
4
5The way this works is by using a custom 'selector' that runs the other event
6loop until the real selector is ready.
7
8It's the responsibility of this event hook to return when there is input ready.
9There are two ways to detect when input is ready:
10
11The inputhook itself is a callable that receives an `InputHookContext`. This
12callable should run the other event loop, and return when the main loop has
13stuff to do. There are two ways to detect when to return:
14
15- Call the `input_is_ready` method periodically. Quit when this returns `True`.
16
17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
18 becomes readable. (But don't read from it.)
19
20 Note that this is not the same as checking for `sys.stdin.fileno()`. The
21 eventloop of prompt-toolkit allows thread-based executors, for example for
22 asynchronous autocompletion. When the completion for instance is ready, we
23 also want prompt-toolkit to gain control again in order to display that.
24"""
25
26from __future__ import annotations
27
28import asyncio
29import os
30import select
31import selectors
32import sys
33import threading
34from asyncio import AbstractEventLoop, get_running_loop
35from selectors import BaseSelector, SelectorKey
36from typing import TYPE_CHECKING, Any, Callable, Mapping
37
38__all__ = [
39 "new_eventloop_with_inputhook",
40 "set_eventloop_with_inputhook",
41 "InputHookSelector",
42 "InputHookContext",
43 "InputHook",
44]
45
46if TYPE_CHECKING:
47 from _typeshed import FileDescriptorLike
48 from typing_extensions import TypeAlias
49
50 _EventMask = int
51
52
53class InputHookContext:
54 """
55 Given as a parameter to the inputhook.
56 """
57
58 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
59 self._fileno = fileno
60 self.input_is_ready = input_is_ready
61
62 def fileno(self) -> int:
63 return self._fileno
64
65
66InputHook: TypeAlias = Callable[[InputHookContext], None]
67
68
69def new_eventloop_with_inputhook(
70 inputhook: Callable[[InputHookContext], None],
71) -> AbstractEventLoop:
72 """
73 Create a new event loop with the given inputhook.
74 """
75 selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
76 loop = asyncio.SelectorEventLoop(selector)
77 return loop
78
79
80def set_eventloop_with_inputhook(
81 inputhook: Callable[[InputHookContext], None],
82) -> AbstractEventLoop:
83 """
84 Create a new event loop with the given inputhook, and activate it.
85 """
86 # Deprecated!
87
88 loop = new_eventloop_with_inputhook(inputhook)
89 asyncio.set_event_loop(loop)
90 return loop
91
92
93class InputHookSelector(BaseSelector):
94 """
95 Usage:
96
97 selector = selectors.SelectSelector()
98 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
99 asyncio.set_event_loop(loop)
100 """
101
102 def __init__(
103 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
104 ) -> None:
105 self.selector = selector
106 self.inputhook = inputhook
107 self._r, self._w = os.pipe()
108
109 def register(
110 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
111 ) -> SelectorKey:
112 return self.selector.register(fileobj, events, data=data)
113
114 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
115 return self.selector.unregister(fileobj)
116
117 def modify(
118 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
119 ) -> SelectorKey:
120 return self.selector.modify(fileobj, events, data=None)
121
122 def select(
123 self, timeout: float | None = None
124 ) -> list[tuple[SelectorKey, _EventMask]]:
125 # If there are tasks in the current event loop,
126 # don't run the input hook.
127 if len(getattr(get_running_loop(), "_ready", [])) > 0:
128 return self.selector.select(timeout=timeout)
129
130 ready = False
131 result = None
132
133 # Run selector in other thread.
134 def run_selector() -> None:
135 nonlocal ready, result
136 result = self.selector.select(timeout=timeout)
137 os.write(self._w, b"x")
138 ready = True
139
140 th = threading.Thread(target=run_selector)
141 th.start()
142
143 def input_is_ready() -> bool:
144 return ready
145
146 # Call inputhook.
147 # The inputhook function is supposed to return when our selector
148 # becomes ready. The inputhook can do that by registering the fd in its
149 # own loop, or by checking the `input_is_ready` function regularly.
150 self.inputhook(InputHookContext(self._r, input_is_ready))
151
152 # Flush the read end of the pipe.
153 try:
154 # Before calling 'os.read', call select.select. This is required
155 # when the gevent monkey patch has been applied. 'os.read' is never
156 # monkey patched and won't be cooperative, so that would block all
157 # other select() calls otherwise.
158 # See: http://www.gevent.org/gevent.os.html
159
160 # Note: On Windows, this is apparently not an issue.
161 # However, if we would ever want to add a select call, it
162 # should use `windll.kernel32.WaitForMultipleObjects`,
163 # because `select.select` can't wait for a pipe on Windows.
164 if sys.platform != "win32":
165 select.select([self._r], [], [], None)
166
167 os.read(self._r, 1024)
168 except OSError:
169 # This happens when the window resizes and a SIGWINCH was received.
170 # We get 'Error: [Errno 4] Interrupted system call'
171 # Just ignore.
172 pass
173
174 # Wait for the real selector to be done.
175 th.join()
176 assert result is not None
177 return result
178
179 def close(self) -> None:
180 """
181 Clean up resources.
182 """
183 if self._r:
184 os.close(self._r)
185 os.close(self._w)
186
187 self._r = self._w = -1
188 self.selector.close()
189
190 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
191 return self.selector.get_map()