Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/inputhook.py: 36%
72 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
3the asyncio event loop.
5The way this works is by using a custom 'selector' that runs the other event
6loop until the real selector is ready.
8It's the responsibility of this event hook to return when there is input ready.
9There are two ways to detect when input is ready:
11The inputhook itself is a callable that receives an `InputHookContext`. This
12callable should run the other event loop, and return when the main loop has
13stuff to do. There are two ways to detect when to return:
15- Call the `input_is_ready` method periodically. Quit when this returns `True`.
17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
18 becomes readable. (But don't read from it.)
20 Note that this is not the same as checking for `sys.stdin.fileno()`. The
21 eventloop of prompt-toolkit allows thread-based executors, for example for
22 asynchronous autocompletion. When the completion for instance is ready, we
23 also want prompt-toolkit to gain control again in order to display that.
24"""
25from __future__ import annotations
27import asyncio
28import os
29import select
30import selectors
31import sys
32import threading
33from asyncio import AbstractEventLoop, get_running_loop
34from selectors import BaseSelector, SelectorKey
35from typing import TYPE_CHECKING, Any, Callable, Mapping
37__all__ = [
38 "new_eventloop_with_inputhook",
39 "set_eventloop_with_inputhook",
40 "InputHookSelector",
41 "InputHookContext",
42 "InputHook",
43]
45if TYPE_CHECKING:
46 from _typeshed import FileDescriptorLike
47 from typing_extensions import TypeAlias
49 _EventMask = int
52class InputHookContext:
53 """
54 Given as a parameter to the inputhook.
55 """
57 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
58 self._fileno = fileno
59 self.input_is_ready = input_is_ready
61 def fileno(self) -> int:
62 return self._fileno
65InputHook: TypeAlias = Callable[[InputHookContext], None]
68def new_eventloop_with_inputhook(
69 inputhook: Callable[[InputHookContext], None],
70) -> AbstractEventLoop:
71 """
72 Create a new event loop with the given inputhook.
73 """
74 selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
75 loop = asyncio.SelectorEventLoop(selector)
76 return loop
79def set_eventloop_with_inputhook(
80 inputhook: Callable[[InputHookContext], None],
81) -> AbstractEventLoop:
82 """
83 Create a new event loop with the given inputhook, and activate it.
84 """
85 # Deprecated!
87 loop = new_eventloop_with_inputhook(inputhook)
88 asyncio.set_event_loop(loop)
89 return loop
92class InputHookSelector(BaseSelector):
93 """
94 Usage:
96 selector = selectors.SelectSelector()
97 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
98 asyncio.set_event_loop(loop)
99 """
101 def __init__(
102 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
103 ) -> None:
104 self.selector = selector
105 self.inputhook = inputhook
106 self._r, self._w = os.pipe()
108 def register(
109 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
110 ) -> SelectorKey:
111 return self.selector.register(fileobj, events, data=data)
113 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
114 return self.selector.unregister(fileobj)
116 def modify(
117 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
118 ) -> SelectorKey:
119 return self.selector.modify(fileobj, events, data=None)
121 def select(
122 self, timeout: float | None = None
123 ) -> list[tuple[SelectorKey, _EventMask]]:
124 # If there are tasks in the current event loop,
125 # don't run the input hook.
126 if len(getattr(get_running_loop(), "_ready", [])) > 0:
127 return self.selector.select(timeout=timeout)
129 ready = False
130 result = None
132 # Run selector in other thread.
133 def run_selector() -> None:
134 nonlocal ready, result
135 result = self.selector.select(timeout=timeout)
136 os.write(self._w, b"x")
137 ready = True
139 th = threading.Thread(target=run_selector)
140 th.start()
142 def input_is_ready() -> bool:
143 return ready
145 # Call inputhook.
146 # The inputhook function is supposed to return when our selector
147 # becomes ready. The inputhook can do that by registering the fd in its
148 # own loop, or by checking the `input_is_ready` function regularly.
149 self.inputhook(InputHookContext(self._r, input_is_ready))
151 # Flush the read end of the pipe.
152 try:
153 # Before calling 'os.read', call select.select. This is required
154 # when the gevent monkey patch has been applied. 'os.read' is never
155 # monkey patched and won't be cooperative, so that would block all
156 # other select() calls otherwise.
157 # See: http://www.gevent.org/gevent.os.html
159 # Note: On Windows, this is apparently not an issue.
160 # However, if we would ever want to add a select call, it
161 # should use `windll.kernel32.WaitForMultipleObjects`,
162 # because `select.select` can't wait for a pipe on Windows.
163 if sys.platform != "win32":
164 select.select([self._r], [], [], None)
166 os.read(self._r, 1024)
167 except OSError:
168 # This happens when the window resizes and a SIGWINCH was received.
169 # We get 'Error: [Errno 4] Interrupted system call'
170 # Just ignore.
171 pass
173 # Wait for the real selector to be done.
174 th.join()
175 assert result is not None
176 return result
178 def close(self) -> None:
179 """
180 Clean up resources.
181 """
182 if self._r:
183 os.close(self._r)
184 os.close(self._w)
186 self._r = self._w = -1
187 self.selector.close()
189 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
190 return self.selector.get_map()