Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/inputhook.py: 36%
70 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1"""
2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
3the asyncio event loop.
5The way this works is by using a custom 'selector' that runs the other event
6loop until the real selector is ready.
8It's the responsibility of this event hook to return when there is input ready.
9There are two ways to detect when input is ready:
11The inputhook itself is a callable that receives an `InputHookContext`. This
12callable should run the other event loop, and return when the main loop has
13stuff to do. There are two ways to detect when to return:
15- Call the `input_is_ready` method periodically. Quit when this returns `True`.
17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
18 becomes readable. (But don't read from it.)
20 Note that this is not the same as checking for `sys.stdin.fileno()`. The
21 eventloop of prompt-toolkit allows thread-based executors, for example for
22 asynchronous autocompletion. When the completion for instance is ready, we
23 also want prompt-toolkit to gain control again in order to display that.
24"""
25from __future__ import annotations
27import asyncio
28import os
29import select
30import selectors
31import sys
32import threading
33from asyncio import AbstractEventLoop, get_running_loop
34from selectors import BaseSelector, SelectorKey
35from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Tuple
37__all__ = [
38 "new_eventloop_with_inputhook",
39 "set_eventloop_with_inputhook",
40 "InputHookSelector",
41 "InputHookContext",
42]
44if TYPE_CHECKING:
45 from _typeshed import FileDescriptorLike
47 _EventMask = int
50def new_eventloop_with_inputhook(
51 inputhook: Callable[[InputHookContext], None]
52) -> AbstractEventLoop:
53 """
54 Create a new event loop with the given inputhook.
55 """
56 selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
57 loop = asyncio.SelectorEventLoop(selector)
58 return loop
61def set_eventloop_with_inputhook(
62 inputhook: Callable[[InputHookContext], None]
63) -> AbstractEventLoop:
64 """
65 Create a new event loop with the given inputhook, and activate it.
66 """
67 loop = new_eventloop_with_inputhook(inputhook)
68 asyncio.set_event_loop(loop)
69 return loop
72class InputHookSelector(BaseSelector):
73 """
74 Usage:
76 selector = selectors.SelectSelector()
77 loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
78 asyncio.set_event_loop(loop)
79 """
81 def __init__(
82 self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
83 ) -> None:
84 self.selector = selector
85 self.inputhook = inputhook
86 self._r, self._w = os.pipe()
88 def register(
89 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
90 ) -> SelectorKey:
91 return self.selector.register(fileobj, events, data=data)
93 def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
94 return self.selector.unregister(fileobj)
96 def modify(
97 self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
98 ) -> SelectorKey:
99 return self.selector.modify(fileobj, events, data=None)
101 def select(
102 self, timeout: float | None = None
103 ) -> list[tuple[SelectorKey, _EventMask]]:
104 # If there are tasks in the current event loop,
105 # don't run the input hook.
106 if len(getattr(get_running_loop(), "_ready", [])) > 0:
107 return self.selector.select(timeout=timeout)
109 ready = False
110 result = None
112 # Run selector in other thread.
113 def run_selector() -> None:
114 nonlocal ready, result
115 result = self.selector.select(timeout=timeout)
116 os.write(self._w, b"x")
117 ready = True
119 th = threading.Thread(target=run_selector)
120 th.start()
122 def input_is_ready() -> bool:
123 return ready
125 # Call inputhook.
126 # The inputhook function is supposed to return when our selector
127 # becomes ready. The inputhook can do that by registering the fd in its
128 # own loop, or by checking the `input_is_ready` function regularly.
129 self.inputhook(InputHookContext(self._r, input_is_ready))
131 # Flush the read end of the pipe.
132 try:
133 # Before calling 'os.read', call select.select. This is required
134 # when the gevent monkey patch has been applied. 'os.read' is never
135 # monkey patched and won't be cooperative, so that would block all
136 # other select() calls otherwise.
137 # See: http://www.gevent.org/gevent.os.html
139 # Note: On Windows, this is apparently not an issue.
140 # However, if we would ever want to add a select call, it
141 # should use `windll.kernel32.WaitForMultipleObjects`,
142 # because `select.select` can't wait for a pipe on Windows.
143 if sys.platform != "win32":
144 select.select([self._r], [], [], None)
146 os.read(self._r, 1024)
147 except OSError:
148 # This happens when the window resizes and a SIGWINCH was received.
149 # We get 'Error: [Errno 4] Interrupted system call'
150 # Just ignore.
151 pass
153 # Wait for the real selector to be done.
154 th.join()
155 assert result is not None
156 return result
158 def close(self) -> None:
159 """
160 Clean up resources.
161 """
162 if self._r:
163 os.close(self._r)
164 os.close(self._w)
166 self._r = self._w = -1
167 self.selector.close()
169 def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
170 return self.selector.get_map()
173class InputHookContext:
174 """
175 Given as a parameter to the inputhook.
176 """
178 def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
179 self._fileno = fileno
180 self.input_is_ready = input_is_ready
182 def fileno(self) -> int:
183 return self._fileno