1""" 
    2Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in 
    3the asyncio event loop. 
    4 
    5The way this works is by using a custom 'selector' that runs the other event 
    6loop until the real selector is ready. 
    7 
    8It's the responsibility of this event hook to return when there is input ready. 
    9There are two ways to detect when input is ready: 
    10 
    11The inputhook itself is a callable that receives an `InputHookContext`. This 
    12callable should run the other event loop, and return when the main loop has 
    13stuff to do. There are two ways to detect when to return: 
    14 
    15- Call the `input_is_ready` method periodically. Quit when this returns `True`. 
    16 
    17- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor 
    18  becomes readable. (But don't read from it.) 
    19 
    20  Note that this is not the same as checking for `sys.stdin.fileno()`. The 
    21  eventloop of prompt-toolkit allows thread-based executors, for example for 
    22  asynchronous autocompletion. When the completion for instance is ready, we 
    23  also want prompt-toolkit to gain control again in order to display that. 
    24""" 
    25 
    26from __future__ import annotations 
    27 
    28import asyncio 
    29import os 
    30import select 
    31import selectors 
    32import sys 
    33import threading 
    34from asyncio import AbstractEventLoop, get_running_loop 
    35from selectors import BaseSelector, SelectorKey 
    36from typing import TYPE_CHECKING, Any, Callable, Mapping 
    37 
    38__all__ = [ 
    39    "new_eventloop_with_inputhook", 
    40    "set_eventloop_with_inputhook", 
    41    "InputHookSelector", 
    42    "InputHookContext", 
    43    "InputHook", 
    44] 
    45 
    46if TYPE_CHECKING: 
    47    from _typeshed import FileDescriptorLike 
    48    from typing_extensions import TypeAlias 
    49 
    50    _EventMask = int 
    51 
    52 
    53class InputHookContext: 
    54    """ 
    55    Given as a parameter to the inputhook. 
    56    """ 
    57 
    58    def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: 
    59        self._fileno = fileno 
    60        self.input_is_ready = input_is_ready 
    61 
    62    def fileno(self) -> int: 
    63        return self._fileno 
    64 
    65 
    66InputHook: TypeAlias = Callable[[InputHookContext], None] 
    67 
    68 
    69def new_eventloop_with_inputhook( 
    70    inputhook: Callable[[InputHookContext], None], 
    71) -> AbstractEventLoop: 
    72    """ 
    73    Create a new event loop with the given inputhook. 
    74    """ 
    75    selector = InputHookSelector(selectors.DefaultSelector(), inputhook) 
    76    loop = asyncio.SelectorEventLoop(selector) 
    77    return loop 
    78 
    79 
    80def set_eventloop_with_inputhook( 
    81    inputhook: Callable[[InputHookContext], None], 
    82) -> AbstractEventLoop: 
    83    """ 
    84    Create a new event loop with the given inputhook, and activate it. 
    85    """ 
    86    # Deprecated! 
    87 
    88    loop = new_eventloop_with_inputhook(inputhook) 
    89    asyncio.set_event_loop(loop) 
    90    return loop 
    91 
    92 
    93class InputHookSelector(BaseSelector): 
    94    """ 
    95    Usage: 
    96 
    97        selector = selectors.SelectSelector() 
    98        loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) 
    99        asyncio.set_event_loop(loop) 
    100    """ 
    101 
    102    def __init__( 
    103        self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] 
    104    ) -> None: 
    105        self.selector = selector 
    106        self.inputhook = inputhook 
    107        self._r, self._w = os.pipe() 
    108 
    109    def register( 
    110        self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 
    111    ) -> SelectorKey: 
    112        return self.selector.register(fileobj, events, data=data) 
    113 
    114    def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: 
    115        return self.selector.unregister(fileobj) 
    116 
    117    def modify( 
    118        self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None 
    119    ) -> SelectorKey: 
    120        return self.selector.modify(fileobj, events, data=None) 
    121 
    122    def select( 
    123        self, timeout: float | None = None 
    124    ) -> list[tuple[SelectorKey, _EventMask]]: 
    125        # If there are tasks in the current event loop, 
    126        # don't run the input hook. 
    127        if len(getattr(get_running_loop(), "_ready", [])) > 0: 
    128            return self.selector.select(timeout=timeout) 
    129 
    130        ready = False 
    131        result = None 
    132 
    133        # Run selector in other thread. 
    134        def run_selector() -> None: 
    135            nonlocal ready, result 
    136            result = self.selector.select(timeout=timeout) 
    137            os.write(self._w, b"x") 
    138            ready = True 
    139 
    140        th = threading.Thread(target=run_selector) 
    141        th.start() 
    142 
    143        def input_is_ready() -> bool: 
    144            return ready 
    145 
    146        # Call inputhook. 
    147        # The inputhook function is supposed to return when our selector 
    148        # becomes ready. The inputhook can do that by registering the fd in its 
    149        # own loop, or by checking the `input_is_ready` function regularly. 
    150        self.inputhook(InputHookContext(self._r, input_is_ready)) 
    151 
    152        # Flush the read end of the pipe. 
    153        try: 
    154            # Before calling 'os.read', call select.select. This is required 
    155            # when the gevent monkey patch has been applied. 'os.read' is never 
    156            # monkey patched and won't be cooperative, so that would block all 
    157            # other select() calls otherwise. 
    158            # See: http://www.gevent.org/gevent.os.html 
    159 
    160            # Note: On Windows, this is apparently not an issue. 
    161            #       However, if we would ever want to add a select call, it 
    162            #       should use `windll.kernel32.WaitForMultipleObjects`, 
    163            #       because `select.select` can't wait for a pipe on Windows. 
    164            if sys.platform != "win32": 
    165                select.select([self._r], [], [], None) 
    166 
    167            os.read(self._r, 1024) 
    168        except OSError: 
    169            # This happens when the window resizes and a SIGWINCH was received. 
    170            # We get 'Error: [Errno 4] Interrupted system call' 
    171            # Just ignore. 
    172            pass 
    173 
    174        # Wait for the real selector to be done. 
    175        th.join() 
    176        assert result is not None 
    177        return result 
    178 
    179    def close(self) -> None: 
    180        """ 
    181        Clean up resources. 
    182        """ 
    183        if self._r: 
    184            os.close(self._r) 
    185            os.close(self._w) 
    186 
    187        self._r = self._w = -1 
    188        self.selector.close() 
    189 
    190    def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: 
    191        return self.selector.get_map()