Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/eventloop/async_generator.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

56 statements  

1""" 

2Implementation for async generators. 

3""" 

4 

5from __future__ import annotations 

6 

7from asyncio import get_running_loop 

8from collections.abc import AsyncGenerator, Callable, Iterable 

9from contextlib import asynccontextmanager 

10from queue import Empty, Full, Queue 

11from typing import Any, TypeVar 

12 

13from .utils import run_in_executor_with_context 

14 

15__all__ = [ 

16 "aclosing", 

17 "generator_to_async_generator", 

18] 

19 

20_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) 

21 

22 

23@asynccontextmanager 

24async def aclosing( 

25 thing: _T_Generator, 

26) -> AsyncGenerator[_T_Generator, None]: 

27 "Similar to `contextlib.aclosing`, in Python 3.10." 

28 try: 

29 yield thing 

30 finally: 

31 await thing.aclose() 

32 

33 

34# By default, choose a buffer size that's a good balance between having enough 

35# throughput, but not consuming too much memory. We use this to consume a sync 

36# generator of completions as an async generator. If the queue size is very 

37# small (like 1), consuming the completions goes really slow (when there are a 

38# lot of items). If the queue size would be unlimited or too big, this can 

39# cause overconsumption of memory, and cause CPU time spent producing items 

40# that are no longer needed (if the consumption of the async generator stops at 

41# some point). We need a fixed size in order to get some back pressure from the 

42# async consumer to the sync producer. We choose 1000 by default here. If we 

43# have around 50k completions, measurements show that 1000 is still 

44# significantly faster than a buffer of 100. 

45DEFAULT_BUFFER_SIZE: int = 1000 

46 

47_T = TypeVar("_T") 

48 

49 

50class _Done: 

51 pass 

52 

53 

54async def generator_to_async_generator( 

55 get_iterable: Callable[[], Iterable[_T]], 

56 buffer_size: int = DEFAULT_BUFFER_SIZE, 

57) -> AsyncGenerator[_T, None]: 

58 """ 

59 Turn a generator or iterable into an async generator. 

60 

61 This works by running the generator in a background thread. 

62 

63 :param get_iterable: Function that returns a generator or iterable when 

64 called. 

65 :param buffer_size: Size of the queue between the async consumer and the 

66 synchronous generator that produces items. 

67 """ 

68 quitting = False 

69 # NOTE: We are limiting the queue size in order to have back-pressure. 

70 q: Queue[_T | _Done] = Queue(maxsize=buffer_size) 

71 loop = get_running_loop() 

72 

73 def runner() -> None: 

74 """ 

75 Consume the generator in background thread. 

76 When items are received, they'll be pushed to the queue. 

77 """ 

78 try: 

79 for item in get_iterable(): 

80 # When this async generator was cancelled (closed), stop this 

81 # thread. 

82 if quitting: 

83 return 

84 

85 while True: 

86 try: 

87 q.put(item, timeout=1) 

88 except Full: 

89 if quitting: 

90 return 

91 continue 

92 else: 

93 break 

94 

95 finally: 

96 while True: 

97 try: 

98 q.put(_Done(), timeout=1) 

99 except Full: 

100 if quitting: 

101 return 

102 continue 

103 else: 

104 break 

105 

106 # Start background thread. 

107 runner_f = run_in_executor_with_context(runner) 

108 

109 try: 

110 while True: 

111 try: 

112 item = q.get_nowait() 

113 except Empty: 

114 item = await loop.run_in_executor(None, q.get) 

115 if isinstance(item, _Done): 

116 break 

117 else: 

118 yield item 

119 finally: 

120 # When this async generator is closed (GeneratorExit exception, stop 

121 # the background thread as well. - we don't need that anymore.) 

122 quitting = True 

123 

124 # Wait for the background thread to finish. (should happen right after 

125 # the last item is yielded). 

126 await runner_f