1""" 
    2Implementation for async generators. 
    3""" 
    4 
    5from __future__ import annotations 
    6 
    7from asyncio import get_running_loop 
    8from contextlib import asynccontextmanager 
    9from queue import Empty, Full, Queue 
    10from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar 
    11 
    12from .utils import run_in_executor_with_context 
    13 
    14__all__ = [ 
    15    "aclosing", 
    16    "generator_to_async_generator", 
    17] 
    18 
    19_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) 
    20 
    21 
    22@asynccontextmanager 
    23async def aclosing( 
    24    thing: _T_Generator, 
    25) -> AsyncGenerator[_T_Generator, None]: 
    26    "Similar to `contextlib.aclosing`, in Python 3.10." 
    27    try: 
    28        yield thing 
    29    finally: 
    30        await thing.aclose() 
    31 
    32 
    33# By default, choose a buffer size that's a good balance between having enough 
    34# throughput, but not consuming too much memory. We use this to consume a sync 
    35# generator of completions as an async generator. If the queue size is very 
    36# small (like 1), consuming the completions goes really slow (when there are a 
    37# lot of items). If the queue size would be unlimited or too big, this can 
    38# cause overconsumption of memory, and cause CPU time spent producing items 
    39# that are no longer needed (if the consumption of the async generator stops at 
    40# some point). We need a fixed size in order to get some back pressure from the 
    41# async consumer to the sync producer. We choose 1000 by default here. If we 
    42# have around 50k completions, measurements show that 1000 is still 
    43# significantly faster than a buffer of 100. 
    44DEFAULT_BUFFER_SIZE: int = 1000 
    45 
    46_T = TypeVar("_T") 
    47 
    48 
    49class _Done: 
    50    pass 
    51 
    52 
    53async def generator_to_async_generator( 
    54    get_iterable: Callable[[], Iterable[_T]], 
    55    buffer_size: int = DEFAULT_BUFFER_SIZE, 
    56) -> AsyncGenerator[_T, None]: 
    57    """ 
    58    Turn a generator or iterable into an async generator. 
    59 
    60    This works by running the generator in a background thread. 
    61 
    62    :param get_iterable: Function that returns a generator or iterable when 
    63        called. 
    64    :param buffer_size: Size of the queue between the async consumer and the 
    65        synchronous generator that produces items. 
    66    """ 
    67    quitting = False 
    68    # NOTE: We are limiting the queue size in order to have back-pressure. 
    69    q: Queue[_T | _Done] = Queue(maxsize=buffer_size) 
    70    loop = get_running_loop() 
    71 
    72    def runner() -> None: 
    73        """ 
    74        Consume the generator in background thread. 
    75        When items are received, they'll be pushed to the queue. 
    76        """ 
    77        try: 
    78            for item in get_iterable(): 
    79                # When this async generator was cancelled (closed), stop this 
    80                # thread. 
    81                if quitting: 
    82                    return 
    83 
    84                while True: 
    85                    try: 
    86                        q.put(item, timeout=1) 
    87                    except Full: 
    88                        if quitting: 
    89                            return 
    90                        continue 
    91                    else: 
    92                        break 
    93 
    94        finally: 
    95            while True: 
    96                try: 
    97                    q.put(_Done(), timeout=1) 
    98                except Full: 
    99                    if quitting: 
    100                        return 
    101                    continue 
    102                else: 
    103                    break 
    104 
    105    # Start background thread. 
    106    runner_f = run_in_executor_with_context(runner) 
    107 
    108    try: 
    109        while True: 
    110            try: 
    111                item = q.get_nowait() 
    112            except Empty: 
    113                item = await loop.run_in_executor(None, q.get) 
    114            if isinstance(item, _Done): 
    115                break 
    116            else: 
    117                yield item 
    118    finally: 
    119        # When this async generator is closed (GeneratorExit exception, stop 
    120        # the background thread as well. - we don't need that anymore.) 
    121        quitting = True 
    122 
    123        # Wait for the background thread to finish. (should happen right after 
    124        # the last item is yielded). 
    125        await runner_f