1"""
2Implementation for async generators.
3"""
4
5from __future__ import annotations
6
7from asyncio import get_running_loop
8from contextlib import asynccontextmanager
9from queue import Empty, Full, Queue
10from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar
11
12from .utils import run_in_executor_with_context
13
14__all__ = [
15 "aclosing",
16 "generator_to_async_generator",
17]
18
19_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None])
20
21
22@asynccontextmanager
23async def aclosing(
24 thing: _T_Generator,
25) -> AsyncGenerator[_T_Generator, None]:
26 "Similar to `contextlib.aclosing`, in Python 3.10."
27 try:
28 yield thing
29 finally:
30 await thing.aclose()
31
32
33# By default, choose a buffer size that's a good balance between having enough
34# throughput, but not consuming too much memory. We use this to consume a sync
35# generator of completions as an async generator. If the queue size is very
36# small (like 1), consuming the completions goes really slow (when there are a
37# lot of items). If the queue size would be unlimited or too big, this can
38# cause overconsumption of memory, and cause CPU time spent producing items
39# that are no longer needed (if the consumption of the async generator stops at
40# some point). We need a fixed size in order to get some back pressure from the
41# async consumer to the sync producer. We choose 1000 by default here. If we
42# have around 50k completions, measurements show that 1000 is still
43# significantly faster than a buffer of 100.
44DEFAULT_BUFFER_SIZE: int = 1000
45
46_T = TypeVar("_T")
47
48
49class _Done:
50 pass
51
52
53async def generator_to_async_generator(
54 get_iterable: Callable[[], Iterable[_T]],
55 buffer_size: int = DEFAULT_BUFFER_SIZE,
56) -> AsyncGenerator[_T, None]:
57 """
58 Turn a generator or iterable into an async generator.
59
60 This works by running the generator in a background thread.
61
62 :param get_iterable: Function that returns a generator or iterable when
63 called.
64 :param buffer_size: Size of the queue between the async consumer and the
65 synchronous generator that produces items.
66 """
67 quitting = False
68 # NOTE: We are limiting the queue size in order to have back-pressure.
69 q: Queue[_T | _Done] = Queue(maxsize=buffer_size)
70 loop = get_running_loop()
71
72 def runner() -> None:
73 """
74 Consume the generator in background thread.
75 When items are received, they'll be pushed to the queue.
76 """
77 try:
78 for item in get_iterable():
79 # When this async generator was cancelled (closed), stop this
80 # thread.
81 if quitting:
82 return
83
84 while True:
85 try:
86 q.put(item, timeout=1)
87 except Full:
88 if quitting:
89 return
90 continue
91 else:
92 break
93
94 finally:
95 while True:
96 try:
97 q.put(_Done(), timeout=1)
98 except Full:
99 if quitting:
100 return
101 continue
102 else:
103 break
104
105 # Start background thread.
106 runner_f = run_in_executor_with_context(runner)
107
108 try:
109 while True:
110 try:
111 item = q.get_nowait()
112 except Empty:
113 item = await loop.run_in_executor(None, q.get)
114 if isinstance(item, _Done):
115 break
116 else:
117 yield item
118 finally:
119 # When this async generator is closed (GeneratorExit exception, stop
120 # the background thread as well. - we don't need that anymore.)
121 quitting = True
122
123 # Wait for the background thread to finish. (should happen right after
124 # the last item is yielded).
125 await runner_f