1"""
2Implementation for async generators.
3"""
4
5from __future__ import annotations
6
7from asyncio import get_running_loop
8from collections.abc import AsyncGenerator, Callable, Iterable
9from contextlib import asynccontextmanager
10from queue import Empty, Full, Queue
11from typing import Any, TypeVar
12
13from .utils import run_in_executor_with_context
14
15__all__ = [
16 "aclosing",
17 "generator_to_async_generator",
18]
19
20_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None])
21
22
23@asynccontextmanager
24async def aclosing(
25 thing: _T_Generator,
26) -> AsyncGenerator[_T_Generator, None]:
27 "Similar to `contextlib.aclosing`, in Python 3.10."
28 try:
29 yield thing
30 finally:
31 await thing.aclose()
32
33
34# By default, choose a buffer size that's a good balance between having enough
35# throughput, but not consuming too much memory. We use this to consume a sync
36# generator of completions as an async generator. If the queue size is very
37# small (like 1), consuming the completions goes really slow (when there are a
38# lot of items). If the queue size would be unlimited or too big, this can
39# cause overconsumption of memory, and cause CPU time spent producing items
40# that are no longer needed (if the consumption of the async generator stops at
41# some point). We need a fixed size in order to get some back pressure from the
42# async consumer to the sync producer. We choose 1000 by default here. If we
43# have around 50k completions, measurements show that 1000 is still
44# significantly faster than a buffer of 100.
45DEFAULT_BUFFER_SIZE: int = 1000
46
47_T = TypeVar("_T")
48
49
50class _Done:
51 pass
52
53
54async def generator_to_async_generator(
55 get_iterable: Callable[[], Iterable[_T]],
56 buffer_size: int = DEFAULT_BUFFER_SIZE,
57) -> AsyncGenerator[_T, None]:
58 """
59 Turn a generator or iterable into an async generator.
60
61 This works by running the generator in a background thread.
62
63 :param get_iterable: Function that returns a generator or iterable when
64 called.
65 :param buffer_size: Size of the queue between the async consumer and the
66 synchronous generator that produces items.
67 """
68 quitting = False
69 # NOTE: We are limiting the queue size in order to have back-pressure.
70 q: Queue[_T | _Done] = Queue(maxsize=buffer_size)
71 loop = get_running_loop()
72
73 def runner() -> None:
74 """
75 Consume the generator in background thread.
76 When items are received, they'll be pushed to the queue.
77 """
78 try:
79 for item in get_iterable():
80 # When this async generator was cancelled (closed), stop this
81 # thread.
82 if quitting:
83 return
84
85 while True:
86 try:
87 q.put(item, timeout=1)
88 except Full:
89 if quitting:
90 return
91 continue
92 else:
93 break
94
95 finally:
96 while True:
97 try:
98 q.put(_Done(), timeout=1)
99 except Full:
100 if quitting:
101 return
102 continue
103 else:
104 break
105
106 # Start background thread.
107 runner_f = run_in_executor_with_context(runner)
108
109 try:
110 while True:
111 try:
112 item = q.get_nowait()
113 except Empty:
114 item = await loop.run_in_executor(None, q.get)
115 if isinstance(item, _Done):
116 break
117 else:
118 yield item
119 finally:
120 # When this async generator is closed (GeneratorExit exception, stop
121 # the background thread as well. - we don't need that anymore.)
122 quitting = True
123
124 # Wait for the background thread to finish. (should happen right after
125 # the last item is yielded).
126 await runner_f