1from __future__ import annotations
2
3import asyncio
4import contextvars
5import sys
6import time
7from asyncio import get_running_loop
8from collections.abc import Awaitable, Callable
9from types import TracebackType
10from typing import Any, TypeVar, cast
11
12__all__ = [
13 "run_in_executor_with_context",
14 "call_soon_threadsafe",
15 "get_traceback_from_context",
16]
17
18_T = TypeVar("_T")
19
20
21def run_in_executor_with_context(
22 func: Callable[..., _T],
23 *args: Any,
24 loop: asyncio.AbstractEventLoop | None = None,
25) -> Awaitable[_T]:
26 """
27 Run a function in an executor, but make sure it uses the same contextvars.
28 This is required so that the function will see the right application.
29
30 See also: https://bugs.python.org/issue34014
31 """
32 loop = loop or get_running_loop()
33 ctx: contextvars.Context = contextvars.copy_context()
34
35 return loop.run_in_executor(None, ctx.run, func, *args)
36
37
38def call_soon_threadsafe(
39 func: Callable[[], None],
40 max_postpone_time: float | None = None,
41 loop: asyncio.AbstractEventLoop | None = None,
42) -> None:
43 """
44 Wrapper around asyncio's `call_soon_threadsafe`.
45
46 This takes a `max_postpone_time` which can be used to tune the urgency of
47 the method.
48
49 Asyncio runs tasks in first-in-first-out. However, this is not what we
50 want for the render function of the prompt_toolkit UI. Rendering is
51 expensive, but since the UI is invalidated very often, in some situations
52 we render the UI too often, so much that the rendering CPU usage slows down
53 the rest of the processing of the application. (Pymux is an example where
54 we have to balance the CPU time spend on rendering the UI, and parsing
55 process output.)
56 However, we want to set a deadline value, for when the rendering should
57 happen. (The UI should stay responsive).
58 """
59 loop2 = loop or get_running_loop()
60
61 # If no `max_postpone_time` has been given, schedule right now.
62 if max_postpone_time is None:
63 loop2.call_soon_threadsafe(func)
64 return
65
66 max_postpone_until = time.time() + max_postpone_time
67
68 def schedule() -> None:
69 # When there are no other tasks scheduled in the event loop. Run it
70 # now.
71 # Notice: uvloop doesn't have this _ready attribute. In that case,
72 # always call immediately.
73 if not getattr(loop2, "_ready", []):
74 func()
75 return
76
77 # If the timeout expired, run this now.
78 if time.time() > max_postpone_until:
79 func()
80 return
81
82 # Schedule again for later.
83 loop2.call_soon_threadsafe(schedule)
84
85 loop2.call_soon_threadsafe(schedule)
86
87
88def get_traceback_from_context(context: dict[str, Any]) -> TracebackType | None:
89 """
90 Get the traceback object from the context.
91 """
92 exception = context.get("exception")
93 if exception:
94 if hasattr(exception, "__traceback__"):
95 return cast(TracebackType, exception.__traceback__)
96 else:
97 # call_exception_handler() is usually called indirectly
98 # from an except block. If it's not the case, the traceback
99 # is undefined...
100 return sys.exc_info()[2]
101
102 return None