Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/async_generator.py: 29%
51 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Implementation for async generators.
3"""
4from __future__ import annotations
6from asyncio import get_running_loop
7from contextlib import asynccontextmanager
8from queue import Empty, Full, Queue
9from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar
11from .utils import run_in_executor_with_context
13__all__ = [
14 "aclosing",
15 "generator_to_async_generator",
16]
18_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None])
21@asynccontextmanager
22async def aclosing(
23 thing: _T_Generator,
24) -> AsyncGenerator[_T_Generator, None]:
25 "Similar to `contextlib.aclosing`, in Python 3.10."
26 try:
27 yield thing
28 finally:
29 await thing.aclose()
32# By default, choose a buffer size that's a good balance between having enough
33# throughput, but not consuming too much memory. We use this to consume a sync
34# generator of completions as an async generator. If the queue size is very
35# small (like 1), consuming the completions goes really slow (when there are a
36# lot of items). If the queue size would be unlimited or too big, this can
37# cause overconsumption of memory, and cause CPU time spent producing items
38# that are no longer needed (if the consumption of the async generator stops at
39# some point). We need a fixed size in order to get some back pressure from the
40# async consumer to the sync producer. We choose 1000 by default here. If we
41# have around 50k completions, measurements show that 1000 is still
42# significantly faster than a buffer of 100.
43DEFAULT_BUFFER_SIZE: int = 1000
45_T = TypeVar("_T")
48class _Done:
49 pass
52async def generator_to_async_generator(
53 get_iterable: Callable[[], Iterable[_T]],
54 buffer_size: int = DEFAULT_BUFFER_SIZE,
55) -> AsyncGenerator[_T, None]:
56 """
57 Turn a generator or iterable into an async generator.
59 This works by running the generator in a background thread.
61 :param get_iterable: Function that returns a generator or iterable when
62 called.
63 :param buffer_size: Size of the queue between the async consumer and the
64 synchronous generator that produces items.
65 """
66 quitting = False
67 # NOTE: We are limiting the queue size in order to have back-pressure.
68 q: Queue[_T | _Done] = Queue(maxsize=buffer_size)
69 loop = get_running_loop()
71 def runner() -> None:
72 """
73 Consume the generator in background thread.
74 When items are received, they'll be pushed to the queue.
75 """
76 try:
77 for item in get_iterable():
78 # When this async generator was cancelled (closed), stop this
79 # thread.
80 if quitting:
81 return
83 while True:
84 try:
85 q.put(item, timeout=1)
86 except Full:
87 if quitting:
88 return
89 continue
90 else:
91 break
93 finally:
94 while True:
95 try:
96 q.put(_Done(), timeout=1)
97 except Full:
98 if quitting:
99 return
100 continue
101 else:
102 break
104 # Start background thread.
105 runner_f = run_in_executor_with_context(runner)
107 try:
108 while True:
109 try:
110 item = q.get_nowait()
111 except Empty:
112 item = await loop.run_in_executor(None, q.get)
113 if isinstance(item, _Done):
114 break
115 else:
116 yield item
117 finally:
118 # When this async generator is closed (GeneratorExit exception, stop
119 # the background thread as well. - we don't need that anymore.)
120 quitting = True
122 # Wait for the background thread to finish. (should happen right after
123 # the last item is yielded).
124 await runner_f