1"""
2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
3the asyncio event loop.
4
5The way this works is by using a custom 'selector' that runs the other event
6loop until the real selector is ready.
7
8It's the responsibility of this event hook to return when there is input ready.
9There are two ways to detect when input is ready:
10
11The inputhook itself is a callable that receives an `InputHookContext`. This
12callable should run the other event loop, and return when the main loop has
13stuff to do. There are two ways to detect when to return:
14
15- Call the `input_is_ready` method periodically. Quit when this returns `True`.
16
17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
18 becomes readable. (But don't read from it.)
19
20 Note that this is not the same as checking for `sys.stdin.fileno()`. The
21 eventloop of prompt-toolkit allows thread-based executors, for example for
22 asynchronous autocompletion. When the completion for instance is ready, we
23 also want prompt-toolkit to gain control again in order to display that.
24"""
25
26from __future__ import annotations
27
28import asyncio
29import os
30import select
31import selectors
32import sys
33import threading
34from asyncio import AbstractEventLoop, get_running_loop
35from collections.abc import Callable, Mapping
36from selectors import BaseSelector, SelectorKey
37from typing import TYPE_CHECKING, Any
38
39__all__ = [
40 "new_eventloop_with_inputhook",
41 "set_eventloop_with_inputhook",
42 "InputHookSelector",
43 "InputHookContext",
44 "InputHook",
45]
46
47if TYPE_CHECKING:
48 from typing import TypeAlias
49
50 from _typeshed import FileDescriptorLike
51
52 _EventMask = int
53
54
55class InputHookContext:
56 """
57 Given as a parameter to the inputhook.
58 """
59
60 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
61 self._fileno = fileno
62 self.input_is_ready = input_is_ready
63
64 def fileno(self) -> int:
65 return self._fileno
66
67
68InputHook: TypeAlias = Callable[[InputHookContext], None]
69
70
71def new_eventloop_with_inputhook(
72 inputhook: Callable[[InputHookContext], None],
73) -> AbstractEventLoop:
74 """
75 Create a new event loop with the given inputhook.
76 """
77 selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
78 loop = asyncio.SelectorEventLoop(selector)
79 return loop
80
81
82def set_eventloop_with_inputhook(
83 inputhook: Callable[[InputHookContext], None],
84) -> AbstractEventLoop:
85 """
86 Create a new event loop with the given inputhook, and activate it.
87 """
88 # Deprecated!
89
90 loop = new_eventloop_with_inputhook(inputhook)
91 asyncio.set_event_loop(loop)
92 return loop
93
94
95class InputHookSelector(BaseSelector):
96 """
97 Usage:
98
99 selector = selectors.SelectSelector()
100 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
101 asyncio.set_event_loop(loop)
102 """
103
104 def __init__(
105 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
106 ) -> None:
107 self.selector = selector
108 self.inputhook = inputhook
109 self._r, self._w = os.pipe()
110
111 def register(
112 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
113 ) -> SelectorKey:
114 return self.selector.register(fileobj, events, data=data)
115
116 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
117 return self.selector.unregister(fileobj)
118
119 def modify(
120 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
121 ) -> SelectorKey:
122 return self.selector.modify(fileobj, events, data=None)
123
124 def select(
125 self, timeout: float | None = None
126 ) -> list[tuple[SelectorKey, _EventMask]]:
127 # If there are tasks in the current event loop,
128 # don't run the input hook.
129 if len(getattr(get_running_loop(), "_ready", [])) > 0:
130 return self.selector.select(timeout=timeout)
131
132 ready = False
133 result = None
134
135 # Run selector in other thread.
136 def run_selector() -> None:
137 nonlocal ready, result
138 result = self.selector.select(timeout=timeout)
139 os.write(self._w, b"x")
140 ready = True
141
142 th = threading.Thread(target=run_selector)
143 th.start()
144
145 def input_is_ready() -> bool:
146 return ready
147
148 # Call inputhook.
149 # The inputhook function is supposed to return when our selector
150 # becomes ready. The inputhook can do that by registering the fd in its
151 # own loop, or by checking the `input_is_ready` function regularly.
152 self.inputhook(InputHookContext(self._r, input_is_ready))
153
154 # Flush the read end of the pipe.
155 try:
156 # Before calling 'os.read', call select.select. This is required
157 # when the gevent monkey patch has been applied. 'os.read' is never
158 # monkey patched and won't be cooperative, so that would block all
159 # other select() calls otherwise.
160 # See: http://www.gevent.org/gevent.os.html
161
162 # Note: On Windows, this is apparently not an issue.
163 # However, if we would ever want to add a select call, it
164 # should use `windll.kernel32.WaitForMultipleObjects`,
165 # because `select.select` can't wait for a pipe on Windows.
166 if sys.platform != "win32":
167 select.select([self._r], [], [], None)
168
169 os.read(self._r, 1024)
170 except OSError:
171 # This happens when the window resizes and a SIGWINCH was received.
172 # We get 'Error: [Errno 4] Interrupted system call'
173 # Just ignore.
174 pass
175
176 # Wait for the real selector to be done.
177 th.join()
178 assert result is not None
179 return result
180
181 def close(self) -> None:
182 """
183 Clean up resources.
184 """
185 if self._r:
186 os.close(self._r)
187 os.close(self._w)
188
189 self._r = self._w = -1
190 self.selector.close()
191
192 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
193 return self.selector.get_map()