1from __future__ import annotations 
    2 
    3import asyncio 
    4import contextvars 
    5import sys 
    6import time 
    7from asyncio import get_running_loop 
    8from types import TracebackType 
    9from typing import Any, Awaitable, Callable, TypeVar, cast 
    10 
    11__all__ = [ 
    12    "run_in_executor_with_context", 
    13    "call_soon_threadsafe", 
    14    "get_traceback_from_context", 
    15] 
    16 
    17_T = TypeVar("_T") 
    18 
    19 
    20def run_in_executor_with_context( 
    21    func: Callable[..., _T], 
    22    *args: Any, 
    23    loop: asyncio.AbstractEventLoop | None = None, 
    24) -> Awaitable[_T]: 
    25    """ 
    26    Run a function in an executor, but make sure it uses the same contextvars. 
    27    This is required so that the function will see the right application. 
    28 
    29    See also: https://bugs.python.org/issue34014 
    30    """ 
    31    loop = loop or get_running_loop() 
    32    ctx: contextvars.Context = contextvars.copy_context() 
    33 
    34    return loop.run_in_executor(None, ctx.run, func, *args) 
    35 
    36 
    37def call_soon_threadsafe( 
    38    func: Callable[[], None], 
    39    max_postpone_time: float | None = None, 
    40    loop: asyncio.AbstractEventLoop | None = None, 
    41) -> None: 
    42    """ 
    43    Wrapper around asyncio's `call_soon_threadsafe`. 
    44 
    45    This takes a `max_postpone_time` which can be used to tune the urgency of 
    46    the method. 
    47 
    48    Asyncio runs tasks in first-in-first-out. However, this is not what we 
    49    want for the render function of the prompt_toolkit UI. Rendering is 
    50    expensive, but since the UI is invalidated very often, in some situations 
    51    we render the UI too often, so much that the rendering CPU usage slows down 
    52    the rest of the processing of the application.  (Pymux is an example where 
    53    we have to balance the CPU time spend on rendering the UI, and parsing 
    54    process output.) 
    55    However, we want to set a deadline value, for when the rendering should 
    56    happen. (The UI should stay responsive). 
    57    """ 
    58    loop2 = loop or get_running_loop() 
    59 
    60    # If no `max_postpone_time` has been given, schedule right now. 
    61    if max_postpone_time is None: 
    62        loop2.call_soon_threadsafe(func) 
    63        return 
    64 
    65    max_postpone_until = time.time() + max_postpone_time 
    66 
    67    def schedule() -> None: 
    68        # When there are no other tasks scheduled in the event loop. Run it 
    69        # now. 
    70        # Notice: uvloop doesn't have this _ready attribute. In that case, 
    71        #         always call immediately. 
    72        if not getattr(loop2, "_ready", []): 
    73            func() 
    74            return 
    75 
    76        # If the timeout expired, run this now. 
    77        if time.time() > max_postpone_until: 
    78            func() 
    79            return 
    80 
    81        # Schedule again for later. 
    82        loop2.call_soon_threadsafe(schedule) 
    83 
    84    loop2.call_soon_threadsafe(schedule) 
    85 
    86 
    87def get_traceback_from_context(context: dict[str, Any]) -> TracebackType | None: 
    88    """ 
    89    Get the traceback object from the context. 
    90    """ 
    91    exception = context.get("exception") 
    92    if exception: 
    93        if hasattr(exception, "__traceback__"): 
    94            return cast(TracebackType, exception.__traceback__) 
    95        else: 
    96            # call_exception_handler() is usually called indirectly 
    97            # from an except block. If it's not the case, the traceback 
    98            # is undefined... 
    99            return sys.exc_info()[2] 
    100 
    101    return None