Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/eventloop/async_generator.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

55 statements  

1""" 

2Implementation for async generators. 

3""" 

4 

5from __future__ import annotations 

6 

7from asyncio import get_running_loop 

8from contextlib import asynccontextmanager 

9from queue import Empty, Full, Queue 

10from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar 

11 

12from .utils import run_in_executor_with_context 

13 

14__all__ = [ 

15 "aclosing", 

16 "generator_to_async_generator", 

17] 

18 

19_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) 

20 

21 

22@asynccontextmanager 

23async def aclosing( 

24 thing: _T_Generator, 

25) -> AsyncGenerator[_T_Generator, None]: 

26 "Similar to `contextlib.aclosing`, in Python 3.10." 

27 try: 

28 yield thing 

29 finally: 

30 await thing.aclose() 

31 

32 

33# By default, choose a buffer size that's a good balance between having enough 

34# throughput, but not consuming too much memory. We use this to consume a sync 

35# generator of completions as an async generator. If the queue size is very 

36# small (like 1), consuming the completions goes really slow (when there are a 

37# lot of items). If the queue size would be unlimited or too big, this can 

38# cause overconsumption of memory, and cause CPU time spent producing items 

39# that are no longer needed (if the consumption of the async generator stops at 

40# some point). We need a fixed size in order to get some back pressure from the 

41# async consumer to the sync producer. We choose 1000 by default here. If we 

42# have around 50k completions, measurements show that 1000 is still 

43# significantly faster than a buffer of 100. 

44DEFAULT_BUFFER_SIZE: int = 1000 

45 

46_T = TypeVar("_T") 

47 

48 

49class _Done: 

50 pass 

51 

52 

53async def generator_to_async_generator( 

54 get_iterable: Callable[[], Iterable[_T]], 

55 buffer_size: int = DEFAULT_BUFFER_SIZE, 

56) -> AsyncGenerator[_T, None]: 

57 """ 

58 Turn a generator or iterable into an async generator. 

59 

60 This works by running the generator in a background thread. 

61 

62 :param get_iterable: Function that returns a generator or iterable when 

63 called. 

64 :param buffer_size: Size of the queue between the async consumer and the 

65 synchronous generator that produces items. 

66 """ 

67 quitting = False 

68 # NOTE: We are limiting the queue size in order to have back-pressure. 

69 q: Queue[_T | _Done] = Queue(maxsize=buffer_size) 

70 loop = get_running_loop() 

71 

72 def runner() -> None: 

73 """ 

74 Consume the generator in background thread. 

75 When items are received, they'll be pushed to the queue. 

76 """ 

77 try: 

78 for item in get_iterable(): 

79 # When this async generator was cancelled (closed), stop this 

80 # thread. 

81 if quitting: 

82 return 

83 

84 while True: 

85 try: 

86 q.put(item, timeout=1) 

87 except Full: 

88 if quitting: 

89 return 

90 continue 

91 else: 

92 break 

93 

94 finally: 

95 while True: 

96 try: 

97 q.put(_Done(), timeout=1) 

98 except Full: 

99 if quitting: 

100 return 

101 continue 

102 else: 

103 break 

104 

105 # Start background thread. 

106 runner_f = run_in_executor_with_context(runner) 

107 

108 try: 

109 while True: 

110 try: 

111 item = q.get_nowait() 

112 except Empty: 

113 item = await loop.run_in_executor(None, q.get) 

114 if isinstance(item, _Done): 

115 break 

116 else: 

117 yield item 

118 finally: 

119 # When this async generator is closed (GeneratorExit exception, stop 

120 # the background thread as well. - we don't need that anymore.) 

121 quitting = True 

122 

123 # Wait for the background thread to finish. (should happen right after 

124 # the last item is yielded). 

125 await runner_f